Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@

public class CountPointProcessor implements PipeProcessor {
private static final String AGGREGATE_SERIES_KEY = "aggregate-series";
private static AtomicLong writePointCount = new AtomicLong(0);
private static final AtomicLong writePointCount = new AtomicLong(0);

private PartialPath aggregateSeries;

@Override
public void validate(PipeParameterValidator validator) throws Exception {
public void validate(PipeParameterValidator validator) {
validator.validateRequiredAttribute(AGGREGATE_SERIES_KEY);
}

Expand All @@ -54,12 +54,9 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati
}

@Override
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
throws Exception {
public void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector) {
tabletInsertionEvent.processTablet(
(tablet, rowCollector) -> {
writePointCount.addAndGet(tablet.rowSize);
});
(tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize));
}

@Override
Expand All @@ -79,5 +76,7 @@ public void process(Event event, EventCollector eventCollector) throws Exception
}

@Override
public void close() throws Exception {}
public void close() {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
import java.util.HashMap;
import java.util.Map;

/** Test pipe's basic functionalities under multiple cluster and consensus protocol settings. */
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2.class})
/** Test pipe's basic functionalities under multiple cluster and consensus protocol settings. */
public class IoTDBPipeProtocolIT {

private BaseEnv senderEnv;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALFileUtils;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener.Status;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
Expand Down Expand Up @@ -233,7 +234,9 @@ private class DeleteOutdatedFileTask implements Runnable {

private int recursionTime = 0;

public DeleteOutdatedFileTask() {}
public DeleteOutdatedFileTask() {
// Do nothing
}

private void init() {
this.firstValidVersionId = initFirstValidWALVersionId();
Expand Down Expand Up @@ -928,7 +931,7 @@ public boolean isAllWALEntriesConsumed() {
public void rollWALFile() {
WALEntry rollWALFileSignal = new WALSignalEntry(WALEntryType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
WALFlushListener walFlushListener = log(rollWALFileSignal);
if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
if (walFlushListener.waitForResult() == AbstractResultListener.Status.FAILURE) {
logger.error(
"Fail to trigger rolling wal node-{}'s wal file log writer.",
identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class WALEntryPosition {
// cache for wal entry
private WALInsertNodeCache cache = null;

private static final String ENTRY_NOT_READY_MESSAGE = "This entry isn't ready for read.";

public WALEntryPosition() {}

public WALEntryPosition(String identifier, long walFileVersionId, long position, int size) {
Expand All @@ -71,7 +73,7 @@ public Pair<ByteBuffer, InsertNode> readByteBufferOrInsertNodeViaCacheDirectly()
*/
public InsertNode readInsertNodeViaCacheAfterCanRead() throws IOException {
if (!canRead()) {
throw new IOException("This entry isn't ready for read.");
throw new IOException(ENTRY_NOT_READY_MESSAGE);
}
return cache.getInsertNode(this);
}
Expand All @@ -83,7 +85,7 @@ public InsertNode readInsertNodeViaCacheAfterCanRead() throws IOException {
*/
public ByteBuffer readByteBufferViaCacheAfterCanRead() throws IOException {
if (!canRead()) {
throw new IOException("This entry isn't ready for read.");
throw new IOException(ENTRY_NOT_READY_MESSAGE);
}
return cache.getByteBuffer(this);
}
Expand Down Expand Up @@ -144,7 +146,7 @@ public boolean canRead() {
/** Return true only when this wal file is sealed. */
public boolean isInSealedFile() {
if (walNode == null || !canRead()) {
throw new RuntimeException("This entry isn't ready for read.");
throw new RuntimeException(ENTRY_NOT_READY_MESSAGE);
}
return walFileVersionId < walNode.getCurrentWALFileVersion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/** This class helps judge whether wal is flushed to the storage device. */
public class WALFlushListener extends AbstractResultListener {
// handler for pipeline, only exists then value is InsertNode
// handler for pipeline, only exists when value is InsertNode
private final WALEntryHandler walEntryHandler;

public WALFlushListener(boolean wait, WALEntryValue value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -234,9 +235,7 @@ public void getFlushedValue() throws Exception {
handler.pinMemTable();
walNode1.onMemTableFlushed(memTable);
// wait until wal flushed
while (!walNode1.isAllWALEntriesConsumed()) {
Thread.sleep(50);
}
Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
assertEquals(node1, handler.getInsertNode());
}

Expand Down Expand Up @@ -269,9 +268,7 @@ public void testConcurrentGetValue() throws Exception {
}

// wait until wal flushed
while (!walNode1.isAllWALEntriesConsumed() && !walNode2.isAllWALEntriesConsumed()) {
Thread.sleep(50);
}
Awaitility.await().until(walNode::isAllWALEntriesConsumed);

walFlushListeners.get(0).getWalEntryHandler().pinMemTable();
walNode.onMemTableFlushed(memTable);
Expand Down