From b8c7f5f7d7789efafcde1114a0777caf5c0b59d6 Mon Sep 17 00:00:00 2001 From: ZhangHongYin <46039728+SpriCoder@users.noreply.github.com> Date: Mon, 22 Jul 2024 16:26:53 +0800 Subject: [PATCH 01/29] Fix concurrency problem in Memory Control (#12984) * fix memory concurrency problem * spotless Signed-off-by: OneSizeFitQuorum Co-authored-by: OneSizeFitQuorum (cherry picked from commit 76ba73336161ddadb87983d0f9b7b02611c5643c) --- .../impl/SchemaRegionMemoryImpl.java | 3 +-- .../impl/SchemaRegionPBTreeImpl.java | 3 +-- .../rescon/memory/SystemInfo.java | 20 ++++++++++--------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index 1db8882326ab5..247d1a0f58e27 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -202,8 +202,7 @@ public synchronized void init() throws MetadataException { if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax(); - if (!SystemInfo.getInstance() - .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) { + if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) { throw new MetadataException( "Total allocated memory for direct buffer will be " + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index 039be37145dd3..b790545b5d66a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -195,8 +195,7 @@ public synchronized void init() throws MetadataException { if (config.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { long memCost = config.getSchemaRatisConsensusLogAppenderBufferSizeMax(); - if (!SystemInfo.getInstance() - .addDirectBufferMemoryCost(config.getSchemaRatisConsensusLogAppenderBufferSizeMax())) { + if (!SystemInfo.getInstance().addDirectBufferMemoryCost(memCost)) { throw new MetadataException( "Total allocated memory for direct buffer will be " + (SystemInfo.getInstance().getDirectBufferMemoryCost() + memCost) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index 4b0f56f858d62..0458320af052a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -40,6 +40,7 @@ import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -196,15 +197,16 @@ public synchronized void resetFlushingMemTableCost(long flushingMemTableCost) { } public boolean addDirectBufferMemoryCost(long size) { - while (true) { - long memCost = directBufferMemoryCost.get(); - if (memCost + size > totalDirectBufferMemorySizeLimit) { - return false; - } - if (directBufferMemoryCost.compareAndSet(memCost, memCost + size)) { - return true; - } - } + AtomicBoolean result = new AtomicBoolean(false); + directBufferMemoryCost.updateAndGet( + memCost -> { + if (memCost + size > totalDirectBufferMemorySizeLimit) { + return memCost; + } + result.set(true); + return memCost + size; + }); + return result.get(); } public void decreaseDirectBufferMemoryCost(long size) { From b78692486a4f944cad9b4a12625da3e28031f211 Mon Sep 17 00:00:00 2001 From: Potato Date: Tue, 23 Jul 2024 09:56:14 +0800 Subject: [PATCH 02/29] Ensure the flush total points statistic function works correctly when enable_auto_create_schema is false #12990 Signed-off-by: OneSizeFitQuorum (cherry picked from commit 392625f38bff72fd3c7979fa4f98f0db738e8cde) --- .../metrics/IoTDBInternalLocalReporter.java | 100 +++++++++++++----- 1 file changed, 73 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java index f33624fb0b2b1..2b6478f79596c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java @@ -44,16 +44,19 @@ import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; import org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalReporter; import org.apache.iotdb.metrics.utils.InternalReporterType; import org.apache.iotdb.metrics.utils.ReporterType; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.session.util.SessionUtils; import org.apache.thrift.TException; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +72,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; + public class IoTDBInternalLocalReporter extends IoTDBInternalReporter { + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBInternalLocalReporter.class); private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); private static final Coordinator COORDINATOR = Coordinator.getInstance(); @@ -159,43 +165,83 @@ protected void writeMetricToIoTDB(Map valueMap, String prefix, l service.execute( () -> { try { - TSInsertRecordReq request = new TSInsertRecordReq(); - List measurements = new ArrayList<>(); - List types = new ArrayList<>(); - List values = new ArrayList<>(); - for (Map.Entry entry : valueMap.entrySet()) { - String measurement = entry.getKey(); - Object value = entry.getValue(); - measurements.add(measurement); - types.add(inferType(value)); - values.add(value); + TSStatus result = insertRecord(valueMap, prefix, time); + if (result.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + createTimeSeries(valueMap, prefix); + result = insertRecord(valueMap, prefix, time); } - ByteBuffer buffer = SessionUtils.getValueBuffer(types, values); - - request.setPrefixPath(prefix); - request.setTimestamp(time); - request.setMeasurements(measurements); - request.setValues(buffer); - request.setIsAligned(false); - - InsertRowStatement s = StatementGenerator.createStatement(request); - final long queryId = SESSION_MANAGER.requestQueryId(); - ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); - if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.error("Failed to update the value of metric with status {}", result.status); + if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn("Failed to update the value of metric with status {}", result); } } catch (IoTDBConnectionException e1) { - LOGGER.error( + LOGGER.warn( "Failed to update the value of metric because of connection failure, because ", e1); } catch (IllegalPathException | QueryProcessException e2) { - LOGGER.error( + LOGGER.warn( "Failed to update the value of metric because of internal error, because ", e2); } }); } + private TSStatus insertRecord(Map valueMap, String prefix, long time) + throws IoTDBConnectionException, QueryProcessException, IllegalPathException { + TSInsertRecordReq request = new TSInsertRecordReq(); + List measurements = new ArrayList<>(); + List types = new ArrayList<>(); + List values = new ArrayList<>(); + for (Map.Entry entry : valueMap.entrySet()) { + String measurement = entry.getKey(); + Object value = entry.getValue(); + measurements.add(measurement); + types.add(inferType(value)); + values.add(value); + } + ByteBuffer buffer = SessionUtils.getValueBuffer(types, values); + + request.setPrefixPath(prefix); + request.setTimestamp(time); + request.setMeasurements(measurements); + request.setValues(buffer); + request.setIsAligned(false); + + InsertRowStatement s = StatementGenerator.createStatement(request); + final long queryId = SESSION_MANAGER.requestQueryId(); + ExecutionResult result = + COORDINATOR.executeForTreeModel( + s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); + return result.status; + } + + private void createTimeSeries(Map valueMap, String prefix) + throws IllegalPathException { + TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq(); + List paths = new ArrayList<>(); + List types = new ArrayList<>(); + List encodings = new ArrayList<>(); + List compressors = new ArrayList<>(); + for (Map.Entry entry : valueMap.entrySet()) { + String measurement = entry.getKey(); + paths.add(prefix + "." + measurement); + TSDataType type = inferType(entry.getValue()); + types.add(type.ordinal()); + encodings.add((int) getDefaultEncoding(type).serialize()); + compressors.add((int) TSFileDescriptor.getInstance().getConfig().getCompressor().serialize()); + } + request.setPaths(paths); + request.setDataTypes(types); + request.setEncodings(encodings); + request.setCompressors(compressors); + CreateMultiTimeSeriesStatement s = StatementGenerator.createStatement(request); + final long queryId = SESSION_MANAGER.requestQueryId(); + + ExecutionResult result = + COORDINATOR.executeForTreeModel( + s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); + if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn("Failed to auto create timeseries for {} with status {}", paths, result.status); + } + } + @Override protected void writeMetricsToIoTDB(Map> valueMap, long time) { for (Map.Entry> value : valueMap.entrySet()) { From 8617303ec3f1b6115b306ed52074ea4c2c0c6e71 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <78788603+Pengzna@users.noreply.github.com> Date: Mon, 22 Jul 2024 21:02:16 -0500 Subject: [PATCH 03/29] Pipe/PipeConsensus: Fix invalid retry count in report & enhance log in pipe consensus (#12989) (cherry picked from commit f5f0a3401f8898a8ba6085bb3b1834eafa4bb0bd) --- .../pipeconsensus/PipeConsensusReceiver.java | 13 ++++++++++--- .../pipe/task/subtask/PipeReportableSubtask.java | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index ce289ec341a4d..2480ac702be24 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -200,7 +200,10 @@ private TPipeConsensusTransferResp preCheckForReceiver(final TPipeConsensusTrans PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId); if (impl == null) { - String message = String.format("PipeConsensus: unexpected consensusGroupId %s", groupId); + String message = + String.format( + "PipeConsensus-PipeName-%s: unexpected consensusGroupId %s", + consensusPipeName, groupId); if (LOGGER.isErrorEnabled()) { LOGGER.error(message); } @@ -210,7 +213,8 @@ private TPipeConsensusTransferResp preCheckForReceiver(final TPipeConsensusTrans if (impl.isReadOnly()) { String message = String.format( - "PipeConsensus-PipeName-%s: fail to receive because system is read-only.", groupId); + "PipeConsensus-PipeName-%s: fail to receive because system is read-only.", + consensusPipeName); if (LOGGER.isErrorEnabled()) { LOGGER.error(message); } @@ -221,7 +225,7 @@ private TPipeConsensusTransferResp preCheckForReceiver(final TPipeConsensusTrans String message = String.format( "PipeConsensus-PipeName-%s: fail to receive because peer is inactive and not ready.", - groupId); + consensusPipeName); if (LOGGER.isWarnEnabled()) { LOGGER.warn(message); } @@ -1053,6 +1057,7 @@ public PipeConsensusTsFileWriter borrowCorrespondingWriter(TCommitId commitId) { diskBuffer.get().setUsed(true); diskBuffer.get().setCommitIdOfCorrespondingHolderEvent(commitId); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOGGER.warn( "PipeConsensus: receiver thread get interrupted when waiting for borrowing tsFileWriter."); } finally { @@ -1074,6 +1079,7 @@ public void handleExit(ConsensusPipeName consensusPipeName) { try { Thread.sleep(RETRY_WAIT_TIME); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOGGER.warn( "PipeConsensus-PipeName-{}: receiver thread get interrupted when exiting.", consensusPipeName.toString()); @@ -1373,6 +1379,7 @@ private TPipeConsensusTransferResp onRequest( return resp; } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOGGER.warn( "PipeConsensus-PipeName-{}: current waiting is interrupted. onSyncedCommitIndex: {}. Exception: ", consensusPipeName, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java index 462644db4df02..c366b4d62e4f5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java @@ -73,7 +73,7 @@ private void onEnrichedEventFailure(final Throwable throwable) { } retryCount.incrementAndGet(); - if (retryCount.get() <= MAX_RETRY_TIMES) { + if (retryCount.get() <= maxRetryTimes) { LOGGER.warn( "Retry executing subtask {} (creation time: {}, simple class: {}), retry count [{}/{}], last exception: {}", taskID, From 6f1c16edcb338a7a8855f9082930506f4636863e Mon Sep 17 00:00:00 2001 From: Brian Demers Date: Mon, 22 Jul 2024 22:41:42 -0400 Subject: [PATCH 04/29] Replaces the internal maven property with an officially supported alternative (#12982) The property `maven.multiModuleProjectDirectory` is not officially support and can be removed at any time, it has been removed when using mvnd. The current alternative with Maven 3 is using path traversal from the current module's directory Fixes: #12981 (cherry picked from commit cbabf8762467cb4efb6cbf303d7d0ac5c18f89ba) --- distribution/src/assembly/all.xml | 26 +++++++++---------- distribution/src/assembly/cli.xml | 6 ++--- distribution/src/assembly/client-cpp.xml | 2 +- distribution/src/assembly/common-files.xml | 12 ++++----- distribution/src/assembly/confignode.xml | 10 +++---- distribution/src/assembly/datanode.xml | 16 ++++++------ distribution/src/assembly/library-udf.xml | 8 +++--- integration-test/src/assembly/mpp-test.xml | 20 +++++++------- .../confignode/src/assembly/confignode.xml | 4 +-- iotdb-core/datanode/src/assembly/server.xml | 10 +++---- 10 files changed, 57 insertions(+), 57 deletions(-) diff --git a/distribution/src/assembly/all.xml b/distribution/src/assembly/all.xml index 028af358eac42..c6da93929293e 100644 --- a/distribution/src/assembly/all.xml +++ b/distribution/src/assembly/all.xml @@ -48,65 +48,65 @@ conf - ${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/conf + ${project.basedir}/../iotdb-core/datanode/src/assembly/resources/conf conf - ${maven.multiModuleProjectDirectory}/iotdb-core/confignode/src/assembly/resources/conf + ${project.basedir}/../iotdb-core/confignode/src/assembly/resources/conf conf - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/conf + ${project.basedir}/../iotdb-core/node-commons/src/assembly/resources/conf sbin - ${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/sbin + ${project.basedir}/../iotdb-core/datanode/src/assembly/resources/sbin 0755 sbin - ${maven.multiModuleProjectDirectory}/iotdb-core/confignode/src/assembly/resources/sbin + ${project.basedir}/../iotdb-core/confignode/src/assembly/resources/sbin 0755 sbin - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/sbin + ${project.basedir}/../iotdb-core/node-commons/src/assembly/resources/sbin 0755 tools - ${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/tools + ${project.basedir}/../iotdb-core/datanode/src/assembly/resources/tools 0755 sbin - ${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/sbin + ${project.basedir}/../iotdb-client/cli/src/assembly/resources/sbin 0755 tools - ${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/tools + ${project.basedir}/../iotdb-client/cli/src/assembly/resources/tools 0755 - + - + - ${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/conf/logback-backup.xml + ${project.basedir}/../iotdb-client/cli/src/assembly/resources/conf/logback-backup.xml conf 0755 - + diff --git a/distribution/src/assembly/cli.xml b/distribution/src/assembly/cli.xml index 3566b472148b0..1c62df4735f57 100644 --- a/distribution/src/assembly/cli.xml +++ b/distribution/src/assembly/cli.xml @@ -39,18 +39,18 @@ sbin - ${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/sbin + ${project.basedir}/../iotdb-client/cli/src/assembly/resources/sbin 0755 tools - ${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/tools + ${project.basedir}/../iotdb-client/cli/src/assembly/resources/tools 0755 - ${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/conf/logback-backup.xml + ${project.basedir}/../iotdb-client/cli/src/assembly/resources/conf/logback-backup.xml conf 0755 diff --git a/distribution/src/assembly/client-cpp.xml b/distribution/src/assembly/client-cpp.xml index a7e6316c94b93..0a4dfffbc061e 100644 --- a/distribution/src/assembly/client-cpp.xml +++ b/distribution/src/assembly/client-cpp.xml @@ -28,7 +28,7 @@ apache-iotdb-${project.version}-client-cpp-${os.classifier}-bin - ${maven.multiModuleProjectDirectory}/iotdb-client/client-cpp/target/client-cpp-${project.version}-cpp-${os.classifier} + ${project.basedir}/../iotdb-client/client-cpp/target/client-cpp-${project.version}-cpp-${os.classifier} ${file.separator} diff --git a/distribution/src/assembly/common-files.xml b/distribution/src/assembly/common-files.xml index 4befd2a8d9bb0..e0c0aa3249a6f 100644 --- a/distribution/src/assembly/common-files.xml +++ b/distribution/src/assembly/common-files.xml @@ -22,27 +22,27 @@ - ${maven.multiModuleProjectDirectory}/licenses + ${project.basedir}/../licenses licenses - ${maven.multiModuleProjectDirectory}/README.md + ${project.basedir}/../README.md - ${maven.multiModuleProjectDirectory}/README_ZH.md + ${project.basedir}/../README_ZH.md - ${maven.multiModuleProjectDirectory}/LICENSE-binary + ${project.basedir}/../LICENSE-binary LICENSE - ${maven.multiModuleProjectDirectory}/NOTICE-binary + ${project.basedir}/../NOTICE-binary NOTICE - ${maven.multiModuleProjectDirectory}/RELEASE_NOTES.md + ${project.basedir}/../RELEASE_NOTES.md diff --git a/distribution/src/assembly/confignode.xml b/distribution/src/assembly/confignode.xml index 502b90758cae2..abe88fce388a5 100644 --- a/distribution/src/assembly/confignode.xml +++ b/distribution/src/assembly/confignode.xml @@ -38,25 +38,25 @@ - ${maven.multiModuleProjectDirectory}/iotdb-core/confignode/src/assembly/resources/sbin + ${project.basedir}/../iotdb-core/confignode/src/assembly/resources/sbin sbin 0755 sbin - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/sbin + ${project.basedir}/../iotdb-core/node-commons/src/assembly/resources/sbin 0755 - ${maven.multiModuleProjectDirectory}/iotdb-core/confignode/src/assembly/resources/conf + ${project.basedir}/../iotdb-core/confignode/src/assembly/resources/conf conf conf - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/conf + ${project.basedir}/../iotdb-core/node-commons/src/assembly/resources/conf - + diff --git a/distribution/src/assembly/datanode.xml b/distribution/src/assembly/datanode.xml index 84697fda6d742..9075bea61e445 100644 --- a/distribution/src/assembly/datanode.xml +++ b/distribution/src/assembly/datanode.xml @@ -40,41 +40,41 @@ - + conf - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/conf + ${project.basedir}/../iotdb-core/node-commons/src/assembly/resources/conf sbin - ${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/sbin + ${project.basedir}/../iotdb-core/datanode/src/assembly/resources/sbin 0755 sbin - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/sbin + ${project.basedir}/../iotdb-core/node-commons/src/assembly/resources/sbin 0755 tools - ${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/tools + ${project.basedir}/../iotdb-core/datanode/src/assembly/resources/tools 0755 sbin - ${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/sbin + ${project.basedir}/../iotdb-client/cli/src/assembly/resources/sbin 0755 tools - ${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/tools + ${project.basedir}/../iotdb-client/cli/src/assembly/resources/tools 0755 - ${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/conf/datanode-env.sh + ${project.basedir}/../iotdb-core/datanode/src/assembly/resources/conf/datanode-env.sh conf/datanode-env.sh 0755 diff --git a/distribution/src/assembly/library-udf.xml b/distribution/src/assembly/library-udf.xml index 300f710b38310..bde1d0330a0ba 100644 --- a/distribution/src/assembly/library-udf.xml +++ b/distribution/src/assembly/library-udf.xml @@ -38,23 +38,23 @@ - ${maven.multiModuleProjectDirectory}/library-udf/src/assembly/tools + ${project.basedir}/../library-udf/src/assembly/tools 0755 - ${maven.multiModuleProjectDirectory}/licenses + ${project.basedir}/../licenses licenses - ${maven.multiModuleProjectDirectory}/LICENSE-binary + ${project.basedir}/../LICENSE-binary licenses LICENSE - ${maven.multiModuleProjectDirectory}/NOTICE-binary + ${project.basedir}/../NOTICE-binary licenses NOTICE diff --git a/integration-test/src/assembly/mpp-test.xml b/integration-test/src/assembly/mpp-test.xml index 509c38b745b49..3dc443c8d03f3 100644 --- a/integration-test/src/assembly/mpp-test.xml +++ b/integration-test/src/assembly/mpp-test.xml @@ -28,49 +28,49 @@ conf - ${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/conf + ${project.basedir}/../iotdb-core/datanode/src/assembly/resources/conf conf - ${maven.multiModuleProjectDirectory}/iotdb-core/confignode/src/assembly/resources/conf + ${project.basedir}/../iotdb-core/confignode/src/assembly/resources/conf conf - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/conf + ${project.basedir}/../iotdb-core/node-commons/src/assembly/resources/conf conf - ${maven.multiModuleProjectDirectory}/iotdb-core/metrics/interface/src/main/assembly/resources/conf + ${project.basedir}/../iotdb-core/metrics/interface/src/main/assembly/resources/conf sbin - ${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/sbin + ${project.basedir}/../iotdb-core/datanode/src/assembly/resources/sbin 0755 sbin - ${maven.multiModuleProjectDirectory}/iotdb-core/confignode/src/assembly/resources/sbin + ${project.basedir}/../iotdb-core/confignode/src/assembly/resources/sbin 0755 tools - ${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/tools + ${project.basedir}/../iotdb-core/datanode/src/assembly/resources/tools 0755 sbin - ${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/sbin + ${project.basedir}/../iotdb-client/cli/src/assembly/resources/sbin 0755 tools - ${maven.multiModuleProjectDirectory}/iotdb-client/cli/src/assembly/resources/tools + ${project.basedir}/../iotdb-client/cli/src/assembly/resources/tools 0755 - ${maven.multiModuleProjectDirectory}/iotdb-core/datanode/src/assembly/resources/conf/datanode-env.sh + ${project.basedir}/../iotdb-core/datanode/src/assembly/resources/conf/datanode-env.sh conf/datanode-env.sh 0755 diff --git a/iotdb-core/confignode/src/assembly/confignode.xml b/iotdb-core/confignode/src/assembly/confignode.xml index 242e3fb810d02..638e47934752f 100644 --- a/iotdb-core/confignode/src/assembly/confignode.xml +++ b/iotdb-core/confignode/src/assembly/confignode.xml @@ -44,11 +44,11 @@ - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties + ${project.basedir}/../node-commons/src/assembly/resources/conf/iotdb-system.properties conf/iotdb-system.properties - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/sbin/iotdb-common.sh + ${project.basedir}/../node-commons/src/assembly/resources/sbin/iotdb-common.sh sbin/iotdb-common.sh diff --git a/iotdb-core/datanode/src/assembly/server.xml b/iotdb-core/datanode/src/assembly/server.xml index 96bfdd92748a6..fc3b1ccf2a1d8 100644 --- a/iotdb-core/datanode/src/assembly/server.xml +++ b/iotdb-core/datanode/src/assembly/server.xml @@ -39,24 +39,24 @@ - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties + ${project.basedir}/../node-commons/src/assembly/resources/conf/iotdb-system.properties conf/iotdb-system.properties - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/sbin/iotdb-common.sh + ${project.basedir}/../node-commons/src/assembly/resources/sbin/iotdb-common.sh sbin/iotdb-common.sh - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/sbin/iotdb-common.sh + ${project.basedir}/../node-commons/src/assembly/resources/sbin/iotdb-common.sh tools/iotdb-common.sh - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/sbin/iotdb-common.sh + ${project.basedir}/../node-commons/src/assembly/resources/sbin/iotdb-common.sh tools/tsfileToolSet/iotdb-common.sh - ${maven.multiModuleProjectDirectory}/iotdb-core/node-commons/src/assembly/resources/sbin/iotdb-common.sh + ${project.basedir}/../node-commons/src/assembly/resources/sbin/iotdb-common.sh tools/schema/iotdb-common.sh From d9e46b592540b8fc4aed36964c0de1c05bea76e5 Mon Sep 17 00:00:00 2001 From: shuwenwei <55970239+shuwenwei@users.noreply.github.com> Date: Tue, 23 Jul 2024 11:27:03 +0800 Subject: [PATCH 05/29] Reduce error log when compaction interrupted (#12985) * reduce error log when compaction interrupted * fix thread pool (cherry picked from commit 27c2f13324e293561f8e3f8bb51098e61135d434) --- .../performer/impl/FastCompactionPerformer.java | 9 +++++++-- .../schedule/CompactionScheduleTaskManager.java | 16 ++++++++++------ .../schedule/CompactionTaskManager.java | 2 ++ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index cb13afa3fc2c8..be702de8e664d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; +import org.apache.tsfile.exception.StopReadTsFileByInterruptException; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.TsFileSequenceReader; @@ -269,8 +270,12 @@ private void compactNonAlignedSeries( futures.get(i).get(); subTaskSummary.increase(taskSummaryList.get(i)); } catch (ExecutionException e) { - if (e.getCause() instanceof CompactionLastTimeCheckFailedException) { - throw (CompactionLastTimeCheckFailedException) e.getCause(); + Throwable cause = e.getCause(); + if (cause instanceof CompactionLastTimeCheckFailedException) { + throw (CompactionLastTimeCheckFailedException) cause; + } + if (cause instanceof StopReadTsFileByInterruptException) { + throw (StopReadTsFileByInterruptException) cause; } throw new IOException("[Compaction] SubCompactionTask meet errors ", e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java index 8dbcf94fa18fa..691318a6f4a3a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.IService; @@ -42,7 +43,6 @@ import java.util.Vector; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -55,7 +55,7 @@ public class CompactionScheduleTaskManager implements IService { private final int ttlCheckerNum = IoTDBDescriptor.getInstance().getConfig().getTTlCheckerNum(); - private ExecutorService compactionScheduleTaskThreadPool; + private WrappedThreadPoolExecutor compactionScheduleTaskThreadPool; private static final Logger logger = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private static final CompactionScheduleTaskManager INSTANCE = new CompactionScheduleTaskManager(); @@ -179,8 +179,10 @@ public void waitAndStop(long milliseconds) { private void initThreadPool() { this.compactionScheduleTaskThreadPool = - IoTDBThreadPoolFactory.newFixedThreadPool( - compactionSelectorNum + ttlCheckerNum, ThreadName.COMPACTION_SCHEDULE.getName()); + (WrappedThreadPoolExecutor) + IoTDBThreadPoolFactory.newFixedThreadPool( + compactionSelectorNum + ttlCheckerNum, ThreadName.COMPACTION_SCHEDULE.getName()); + this.compactionScheduleTaskThreadPool.disableErrorLog(); init = true; } @@ -189,8 +191,10 @@ private void restartThreadPool() { compactionScheduleTaskThreadPool.shutdownNow(); waitForThreadPoolTerminated(); compactionScheduleTaskThreadPool = - IoTDBThreadPoolFactory.newFixedThreadPool( - compactionSelectorNum + ttlCheckerNum, ThreadName.COMPACTION_SCHEDULE.getName()); + (WrappedThreadPoolExecutor) + IoTDBThreadPoolFactory.newFixedThreadPool( + compactionSelectorNum + ttlCheckerNum, ThreadName.COMPACTION_SCHEDULE.getName()); + compactionScheduleTaskThreadPool.disableErrorLog(); startScheduleTasks(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java index 27eadff48742c..b04c2a8b7bef7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java @@ -138,12 +138,14 @@ private void initThreadPool() { (WrappedThreadPoolExecutor) IoTDBThreadPoolFactory.newFixedThreadPool( compactionThreadNum, ThreadName.COMPACTION_WORKER.getName()); + this.taskExecutionPool.disableErrorLog(); this.subCompactionTaskExecutionPool = (WrappedThreadPoolExecutor) IoTDBThreadPoolFactory.newFixedThreadPool( compactionThreadNum * IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(), ThreadName.COMPACTION_SUB_TASK.getName()); + this.subCompactionTaskExecutionPool.disableErrorLog(); for (int i = 0; i < compactionThreadNum; ++i) { taskExecutionPool.submit(new CompactionWorker(i, candidateCompactionTaskQueue)); } From c36e9d7ec98fc3710769e524c526ac7a3af72f07 Mon Sep 17 00:00:00 2001 From: Jiang Tian Date: Tue, 23 Jul 2024 11:42:55 +0800 Subject: [PATCH 06/29] Remove UNKNOWN from WALFileVersion which fails recovering V1 WAL file (#12986) * Remove UNKNOWN from WALFileVersion which fails recovering V1 WAL file * fix ut * Detail print (cherry picked from commit abb1f6ad12b0a4b6260dd20a283c696e37b93f37) --- .../dataregion/wal/WALManager.java | 2 +- .../dataregion/wal/io/LogWriter.java | 9 +--- .../dataregion/wal/io/WALFileVersion.java | 54 ++++++++++++------- .../dataregion/wal/io/WALInputStream.java | 24 ++++----- .../dataregion/wal/io/WALMetaData.java | 18 +++---- .../dataregion/wal/io/WALWriter.java | 14 ++--- .../wal/recover/WALRepairWriter.java | 30 ++++------- .../wal/compression/WALCompressionTest.java | 13 ++--- .../wal/recover/WALRepairWriterTest.java | 7 ++- 9 files changed, 83 insertions(+), 88 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index 37f6330f2b3e9..bff78d50e5bf8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -181,7 +181,7 @@ private void deleteOutdatedFiles() { deleteOutdatedFilesInWALNodes(); if (firstLoop && shouldThrottle()) { logger.warn( - "WAL disk usage {} is larger than the iot_consensus_throttle_threshold_in_byte {}, please check your write load, iot consensus and the pipe module. It's better to allocate more disk for WAL.", + "WAL disk usage {} is larger than the iot_consensus_throttle_threshold_in_byte * 0.8 {}, please check your write load, iot consensus and the pipe module. It's better to allocate more disk for WAL.", getTotalDiskUsage(), getThrottleThreshold()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java index 668da2bbd23e0..278fe93cae07f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java @@ -34,7 +34,6 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; -import java.nio.charset.StandardCharsets; /** * LogWriter writes the binary logs into a file, including writing {@link WALEntry} into .wal file @@ -69,12 +68,8 @@ protected LogWriter(File logFile, WALFileVersion version) throws IOException { this.logFile = logFile; this.logStream = new FileOutputStream(logFile, true); this.logChannel = this.logStream.getChannel(); - if (!logFile.exists() || logFile.length() == 0) { - this.logChannel.write( - ByteBuffer.wrap( - version == WALFileVersion.V1 - ? WALWriter.MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8) - : WALWriter.MAGIC_STRING_V2.getBytes(StandardCharsets.UTF_8))); + if ((!logFile.exists() || logFile.length() == 0) && version == WALFileVersion.V2) { + this.logChannel.write(ByteBuffer.wrap(version.getVersionBytes())); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java index 59e7a5534ce49..e3d374551b115 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileVersion.java @@ -25,9 +25,26 @@ import java.nio.charset.StandardCharsets; public enum WALFileVersion { - V1, - V2, - UNKNOWN; + V1("WAL"), + V2("V2-WAL"); + + private final String versionString; + private byte[] versionBytes; + + public String getVersionString() { + return versionString; + } + + public byte[] getVersionBytes() { + return versionBytes; + } + + WALFileVersion(String versionString) { + this.versionString = versionString; + if (versionString != null) { + this.versionBytes = versionString.getBytes(StandardCharsets.UTF_8); + } + } public static WALFileVersion getVersion(File file) throws IOException { try (FileChannel channel = FileChannel.open(file.toPath())) { @@ -38,22 +55,23 @@ public static WALFileVersion getVersion(File file) throws IOException { public static WALFileVersion getVersion(FileChannel channel) throws IOException { long originalPosition = channel.position(); try { - channel.position(0); - ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES); - channel.read(buffer); - buffer.flip(); - if (buffer.remaining() < WALWriter.MAGIC_STRING_V2_BYTES) { - return UNKNOWN; - } - String version = new String(buffer.array(), StandardCharsets.UTF_8); - switch (version) { - case WALWriter.MAGIC_STRING_V2: - return V2; - case WALWriter.MAGIC_STRING_V1: - return V1; - default: - return UNKNOWN; + // head magic string starts to exist since V2 + WALFileVersion[] versions = {V2}; + for (WALFileVersion version : versions) { + channel.position(0); + if (channel.size() < version.versionBytes.length) { + continue; + } + ByteBuffer buffer = ByteBuffer.allocate(version.versionBytes.length); + channel.read(buffer); + buffer.flip(); + String versionString = new String(buffer.array(), StandardCharsets.UTF_8); + if (version.versionString.equals(versionString)) { + return version; + } } + // v1 by default + return V1; } finally { channel.position(originalPosition); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java index 14854a056230d..0b10876d6dbd2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -67,7 +67,7 @@ public WALInputStream(File logFile) throws IOException { } private void getEndOffset() throws IOException { - if (channel.size() < WALWriter.MAGIC_STRING_V2_BYTES + Integer.BYTES) { + if (channel.size() < WALFileVersion.V2.getVersionBytes().length + Integer.BYTES) { // An broken file endOffset = channel.size(); return; @@ -77,18 +77,18 @@ private void getEndOffset() throws IOException { try { if (version == WALFileVersion.V2) { // New Version - ByteBuffer magicStringBuffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES); - channel.read(magicStringBuffer, channel.size() - WALWriter.MAGIC_STRING_V2_BYTES); + ByteBuffer magicStringBuffer = ByteBuffer.allocate(version.getVersionBytes().length); + channel.read(magicStringBuffer, channel.size() - version.getVersionBytes().length); magicStringBuffer.flip(); if (logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX) || !new String(magicStringBuffer.array(), StandardCharsets.UTF_8) - .equals(WALWriter.MAGIC_STRING_V2)) { + .equals(version.getVersionString())) { // This is a broken wal or checkpoint file endOffset = channel.size(); return; } else { // This is a normal wal file or check point file - position = channel.size() - WALWriter.MAGIC_STRING_V2_BYTES - Integer.BYTES; + position = channel.size() - version.getVersionBytes().length - Integer.BYTES; } } else { if (logFile.getName().endsWith(IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX)) { @@ -97,16 +97,16 @@ private void getEndOffset() throws IOException { return; } // Old version - ByteBuffer magicStringBuffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V1_BYTES); - channel.read(magicStringBuffer, channel.size() - WALWriter.MAGIC_STRING_V1_BYTES); + ByteBuffer magicStringBuffer = ByteBuffer.allocate(version.getVersionBytes().length); + channel.read(magicStringBuffer, channel.size() - version.getVersionBytes().length); magicStringBuffer.flip(); if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8) - .equals(WALWriter.MAGIC_STRING_V1)) { + .equals(version.getVersionString())) { // this is a broken wal file endOffset = channel.size(); return; } else { - position = channel.size() - WALWriter.MAGIC_STRING_V1_BYTES - Integer.BYTES; + position = channel.size() - version.getVersionBytes().length - Integer.BYTES; } } // Read the metadata size @@ -115,11 +115,11 @@ private void getEndOffset() throws IOException { int metadataSize = metadataSizeBuf.getInt(); // -1 is for the endmarker endOffset = - channel.size() - WALWriter.MAGIC_STRING_V2_BYTES - Integer.BYTES - metadataSize - 1; + channel.size() - version.getVersionBytes().length - Integer.BYTES - metadataSize - 1; } finally { if (version == WALFileVersion.V2) { // Set the position back to the end of head magic string - channel.position(WALWriter.MAGIC_STRING_V2_BYTES); + channel.position(version.getVersionBytes().length); } else { // There is no head magic string in V1 version channel.position(0); @@ -275,7 +275,7 @@ private void tryLoadSegment() throws IOException { */ public void skipToGivenLogicalPosition(long pos) throws IOException { if (version == WALFileVersion.V2) { - channel.position(WALWriter.MAGIC_STRING_V2_BYTES); + channel.position(version.getVersionBytes().length); long posRemain = pos; SegmentInfo segmentInfo; do { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java index 846fc56e949a5..ba9211656ef03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALMetaData.java @@ -131,7 +131,8 @@ public long getFirstSearchIndex() { } public static WALMetaData readFromWALFile(File logFile, FileChannel channel) throws IOException { - if (channel.size() < WALWriter.MAGIC_STRING_V2_BYTES || !isValidMagicString(channel)) { + if (channel.size() < WALFileVersion.V2.getVersionBytes().length + || !isValidMagicString(channel)) { throw new BrokenWALFileException(logFile); } @@ -141,12 +142,7 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr try { ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES); WALFileVersion version = WALFileVersion.getVersion(channel); - position = - channel.size() - - Integer.BYTES - - (version == WALFileVersion.V2 - ? WALWriter.MAGIC_STRING_V2_BYTES - : WALWriter.MAGIC_STRING_V1_BYTES); + position = channel.size() - Integer.BYTES - (version.getVersionBytes().length); channel.read(metadataSizeBuf, position); metadataSizeBuf.flip(); // load metadata @@ -178,12 +174,12 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr } private static boolean isValidMagicString(FileChannel channel) throws IOException { - ByteBuffer magicStringBytes = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES); - channel.read(magicStringBytes, channel.size() - WALWriter.MAGIC_STRING_V2_BYTES); + ByteBuffer magicStringBytes = ByteBuffer.allocate(WALFileVersion.V2.getVersionBytes().length); + channel.read(magicStringBytes, channel.size() - WALFileVersion.V2.getVersionBytes().length); magicStringBytes.flip(); String magicString = new String(magicStringBytes.array(), StandardCharsets.UTF_8); - return magicString.equals(WALWriter.MAGIC_STRING_V2) - || magicString.contains(WALWriter.MAGIC_STRING_V1); + return magicString.equals(WALFileVersion.V2.getVersionString()) + || magicString.contains(WALFileVersion.V1.getVersionString()); } public void setTruncateOffSet(long offset) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java index 4d2d522c45dcd..f23a6a4391e90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java @@ -27,17 +27,10 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; /** WALWriter writes the binary {@link WALEntry} into .wal file. */ public class WALWriter extends LogWriter { - public static final String MAGIC_STRING_V1 = "WAL"; - public static final String MAGIC_STRING_V2 = "V2-WAL"; - public static final int MAGIC_STRING_V1_BYTES = - MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length; - public static final int MAGIC_STRING_V2_BYTES = - MAGIC_STRING_V2.getBytes(StandardCharsets.UTF_8).length; private WALFileStatus walFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX; // wal files' metadata protected final WALMetaData metaData = new WALMetaData(); @@ -50,6 +43,7 @@ public WALWriter(File logFile) throws IOException { public WALWriter(File logFile, WALFileVersion version) throws IOException { super(logFile, version); + this.version = version; } /** @@ -76,16 +70,14 @@ private void endFile() throws IOException { endMarker.serializedSize() + metaDataSize + Integer.BYTES - + (version != WALFileVersion.V2 ? MAGIC_STRING_V1_BYTES : MAGIC_STRING_V2_BYTES)); + + version.getVersionBytes().length); // mark info part ends endMarker.serialize(buffer); // flush meta data metaData.serialize(buffer); buffer.putInt(metaDataSize); // add magic string - buffer.put( - (version != WALFileVersion.V2 ? MAGIC_STRING_V1 : MAGIC_STRING_V2) - .getBytes(StandardCharsets.UTF_8)); + buffer.put(version.getVersionBytes()); writeMetadata(buffer); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java index c60c4d1a80956..d1b27060c90b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriter.java @@ -30,11 +30,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.StandardOpenOption; -import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V1; -import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V1_BYTES; -import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2; -import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2_BYTES; - /** Check whether the wal file is broken and repair it. */ public class WALRepairWriter { private final File logFile; @@ -47,21 +42,12 @@ public void repair(WALMetaData metaData) throws IOException { // locate broken data long truncateSize; WALFileVersion version = WALFileVersion.getVersion(logFile); - if (version == WALFileVersion.UNKNOWN) { - truncateSize = 0; - } else if (version == WALFileVersion.V2) { - if (readTailMagic(MAGIC_STRING_V2_BYTES).equals(MAGIC_STRING_V2)) { // complete file - return; - } else { // file with broken magic string - truncateSize = metaData.getTruncateOffSet(); - } - } else { - if (readTailMagic(MAGIC_STRING_V1_BYTES).contains(MAGIC_STRING_V1)) { - return; - } else { - truncateSize = metaData.getTruncateOffSet(); - } + if (version.getVersionString().equals(readTailMagic(version))) { // complete file + return; + } else { // file with broken magic string + truncateSize = metaData.getTruncateOffSet(); } + // truncate broken data try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.APPEND)) { channel.truncate(truncateSize); @@ -72,7 +58,11 @@ public void repair(WALMetaData metaData) throws IOException { } } - private String readTailMagic(int size) throws IOException { + private String readTailMagic(WALFileVersion version) throws IOException { + int size = version.getVersionBytes().length; + if (logFile.length() < size) { + return null; + } try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ)) { ByteBuffer magicStringBytes = ByteBuffer.allocate(size); channel.read(magicStringBytes, channel.size() - size); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java index d723f2e503844..11f1a1d6859c9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/compression/WALCompressionTest.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALSignalEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader; +import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALReader; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter; @@ -185,10 +186,10 @@ public void testUncompressedWALStructure() try (DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(Files.newInputStream(walFile.toPath())))) { - byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_V2_BYTES]; + byte[] magicStringBytes = new byte[WALFileVersion.V2.getVersionBytes().length]; // head magic string dataInputStream.readFully(magicStringBytes); - Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes)); + Assert.assertEquals(WALFileVersion.V2.getVersionString(), new String(magicStringBytes)); Assert.assertEquals( CompressionType.UNCOMPRESSED, CompressionType.deserialize(dataInputStream.readByte())); Assert.assertEquals(buf.array().length, dataInputStream.readInt()); @@ -202,7 +203,7 @@ public void testUncompressedWALStructure() dataInputStream.readFully(metadataBuf.array()); // Tail magic string dataInputStream.readFully(magicStringBytes); - Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes)); + Assert.assertEquals(WALFileVersion.V2.getVersionString(), new String(magicStringBytes)); } } @@ -238,10 +239,10 @@ public void testCompressedWALStructure() try (DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(Files.newInputStream(walFile.toPath())))) { - byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_V2_BYTES]; + byte[] magicStringBytes = new byte[WALFileVersion.V2.getVersionBytes().length]; // head magic string dataInputStream.readFully(magicStringBytes); - Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes)); + Assert.assertEquals(WALFileVersion.V2.getVersionString(), new String(magicStringBytes)); Assert.assertEquals( CompressionType.LZ4, CompressionType.deserialize(dataInputStream.readByte())); Assert.assertEquals(compressed.length, dataInputStream.readInt()); @@ -258,7 +259,7 @@ public void testCompressedWALStructure() dataInputStream.readFully(metadataBuf.array()); // Tail magic string dataInputStream.readFully(magicStringBytes); - Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes)); + Assert.assertEquals(WALFileVersion.V2.getVersionString(), new String(magicStringBytes)); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java index aca54fe2f19a5..e8c5ee02dfc26 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALRepairWriterTest.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALByteBufReader; +import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALFileVersion; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALMetaData; import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest; @@ -72,11 +73,12 @@ public void testEmptyFile() throws IOException { new WALRepairWriter(logFile).repair(walMetaData); // verify file, marker + metadata(search index + size number) + metadata size + head magic // string + tail magic string + // empty file will be assumed as V1 (because of no header magic) Assert.assertEquals( Byte.BYTES + (Long.BYTES + Integer.BYTES) + Integer.BYTES - + WALWriter.MAGIC_STRING_V2_BYTES * 2, + + WALFileVersion.V1.getVersionBytes().length, logFile.length()); try (WALByteBufReader reader = new WALByteBufReader(logFile)) { Assert.assertFalse(reader.hasNext()); @@ -96,11 +98,12 @@ public void testFileWithoutMagicString() throws IOException { // repair new WALRepairWriter(logFile).repair(walMetaData); // verify file, marker + metadata(search index + size number) + metadata size + magic string + // file too small will be assumed as V1 (because of no header magic) Assert.assertEquals( Byte.BYTES + (Long.BYTES + Integer.BYTES) + Integer.BYTES - + WALWriter.MAGIC_STRING_V2_BYTES * 2, + + WALFileVersion.V1.getVersionBytes().length, logFile.length()); try (WALByteBufReader reader = new WALByteBufReader(logFile)) { Assert.assertFalse(reader.hasNext()); From 138ff26cd3042e7d70a564e70e7cdd5d3447c0a6 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 23 Jul 2024 14:08:27 +0800 Subject: [PATCH 07/29] Pipe: Fix empty tablets generated by pattern parsing on sender side may cause NPE on receiver side (#12994) Co-authored-by: Steve Yurong Su (cherry picked from commit a45adbc8534c1a457d43b220a4e8a7cef4218e96) --- .../request/PipeTransferTabletRawReq.java | 5 ++++ .../task/connection/PipeEventCollector.java | 26 +++++++++---------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java index 40c191b9d9ea9..389eb63d07c84 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -61,6 +61,11 @@ public InsertTabletStatement constructStatement() { new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); try { + if (tablet == null || tablet.rowSize == 0) { + // Empty statement, will be filtered after construction + return new InsertTabletStatement(); + } + final TSInsertTabletReq request = new TSInsertTabletReq(); for (final IMeasurementSchema measurementSchema : tablet.getSchemas()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java index fb88c8983e5b3..c4c91f80f582b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java @@ -97,8 +97,7 @@ private void parseAndCollectEvent(final PipeInsertNodeTabletInsertionEvent sourc if (sourceEvent.shouldParseTimeOrPattern()) { for (final PipeRawTabletInsertionEvent parsedEvent : sourceEvent.toRawTabletInsertionEvents()) { - hasNoGeneratedEvent = false; - collectEvent(parsedEvent); + collectParsedRawTableEvent(parsedEvent); } } else { collectEvent(sourceEvent); @@ -106,15 +105,10 @@ private void parseAndCollectEvent(final PipeInsertNodeTabletInsertionEvent sourc } private void parseAndCollectEvent(final PipeRawTabletInsertionEvent sourceEvent) { - if (sourceEvent.shouldParseTimeOrPattern()) { - final PipeRawTabletInsertionEvent parsedEvent = sourceEvent.parseEventWithPatternOrTime(); - if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) { - hasNoGeneratedEvent = false; - collectEvent(parsedEvent); - } - } else { - collectEvent(sourceEvent); - } + collectParsedRawTableEvent( + sourceEvent.shouldParseTimeOrPattern() + ? sourceEvent.parseEventWithPatternOrTime() + : sourceEvent); } private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) throws Exception { @@ -132,14 +126,20 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th try { for (final TabletInsertionEvent parsedEvent : sourceEvent.toTabletInsertionEvents()) { - hasNoGeneratedEvent = false; - collectEvent(parsedEvent); + collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent); } } finally { sourceEvent.close(); } } + private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent parsedEvent) { + if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) { + hasNoGeneratedEvent = false; + collectEvent(parsedEvent); + } + } + private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent deleteDataEvent) { // Only used by events containing delete data node, no need to bind progress index here since // delete data event does not have progress index currently From 12427ada58be42757eb75cf70af9ee85b42302c4 Mon Sep 17 00:00:00 2001 From: Haonan Date: Tue, 23 Jul 2024 14:22:09 +0800 Subject: [PATCH 08/29] Bump tsfile version to 1.0.1-a6fb416-SNAPSHOT (#12999) (cherry picked from commit 41213b097fbaac262c3a3044e5cc2f792f1dadba) From f39843d0bd1991ba47a7162de86395d322fddc99 Mon Sep 17 00:00:00 2001 From: Brian Demers Date: Tue, 23 Jul 2024 02:30:28 -0400 Subject: [PATCH 09/29] Allows for unit tests to be run in parallel (#12980) NOTE: This does NOT set Surefire's `forkCount` property, that will be done on an isolated commit This is to ensure these changes don't negatively affect the existing build. Details on changes: - Change to test working directory The working directory was changed from `${project.build.directory}` to `${project.build.directory}/fork_${surefire.forkNumber}` (single fork builds would be `/target/fork_1`), This isolates any test output written to relative paths. Long term tests may want to be more explicit where they write output to, (e.g. using JUnit 5's `@TempDir` annotation) - IOTDB_CONF property Becuse the working directory change, the IOTDB_CONF directory needs to resolve differently, however, even without parallel tests, this property _should_ use an absolute path - Test run `mkdirs` The tests assumed that the `./target` directory was always present (relative to the working directory), a change to the working directory invalidated this assumption, so any directories a test uses should be created. This is probably a good idea even without the parallel test concern. (but in the future this could be replaced with something like JUnit 5's `@TempDir` annotation) - Hardcoded Ports TestUtils, was using a hardcodes sequentially from `6001`, it now uses free JUnit 5's `@TempDir` annotation ports Fixes: #12979 (cherry picked from commit 77b41310f55ec602f8cba96cca9529f5f0792f24) --- .../org/apache/iotdb/tool/WriteDataFileTest.java | 14 ++++++++++++++ .../apache/iotdb/consensus/ratis/TestUtils.java | 11 ++++++++++- iotdb-core/datanode/pom.xml | 2 +- .../iotdb/db/utils/constant/TestConstant.java | 8 +++++++- .../dataregion/wal/io/WALFileTest.java | 2 ++ pom.xml | 2 ++ 6 files changed, 36 insertions(+), 3 deletions(-) diff --git a/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java b/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java index c198b530c4e74..eb9e01d293700 100644 --- a/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java +++ b/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java @@ -19,8 +19,10 @@ package org.apache.iotdb.tool; +import org.junit.Before; import org.junit.Test; +import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -28,6 +30,18 @@ import static org.junit.Assert.assertTrue; public class WriteDataFileTest { + + /** + * Create the 'target' directory before the tests run. When running tests with multiple threads, + * the working directory might be 'target/fork_#' which would changes the assumption that the + * 'target' directory exists already. When running tests via an IDE, the working directory might + * be the project directory, and the target directory likely already exists. + */ + @Before + public void createTestDirectory() { + new File("target").mkdirs(); + } + @Test public void writeCsvFileTest() { List headerNames = diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java index 4aaa5f77d9e90..f5997c261fdff 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java @@ -51,6 +51,7 @@ import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; +import java.net.ServerSocket; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -251,7 +252,7 @@ private MiniCluster( this.servers = new ArrayList<>(); for (int i = 0; i < replicas; i++) { - peers.add(new Peer(gid, i, new TEndPoint("127.0.0.1", 6001 + i))); + peers.add(new Peer(gid, i, new TEndPoint("127.0.0.1", randomFreePort()))); final File storage = storageProvider.apply(i); FileUtils.deleteFileQuietly(storage); @@ -493,4 +494,12 @@ MiniCluster create() { return new MiniCluster(gid, replicas, peerStorageProvider, smProvider, ratisConfig); } } + + private static int randomFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new IllegalStateException("Failed to find free server socket port.", e); + } + } } diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 007b146784cc8..7ddf561b3fd85 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -380,7 +380,7 @@ ${iotdb.ut.skip} - src/test/resources + ${project.basedir}/src/test/resources false random diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java index 1647a19b0cbb1..a0724c9ae1c70 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java @@ -25,7 +25,7 @@ public class TestConstant { - public static final String BASE_OUTPUT_PATH = "target".concat(File.separator); + public static final String BASE_OUTPUT_PATH = baseOutputDirectory(); public static final String OUTPUT_DATA_DIR = BASE_OUTPUT_PATH.concat("data").concat(File.separator); public static final String PARTIAL_PATH_STRING = @@ -109,4 +109,10 @@ public static String getTestTsFileDir( virtualStorageGroupId, timePartitionId); } + + private static String baseOutputDirectory() { + File dir = new File("target"); + dir.mkdirs(); + return dir.getPath().concat(File.separator); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java index 3e6ec4937adf8..a15cc27fa268d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALFileTest.java @@ -68,6 +68,8 @@ public class WALFileTest { public void setUp() throws Exception { if (walFile.exists()) { Files.delete(walFile.toPath()); + } else { + walFile.getParentFile().mkdirs(); } } diff --git a/pom.xml b/pom.xml index 04b2fbea5b0be..58a9047b3521f 100644 --- a/pom.xml +++ b/pom.xml @@ -743,6 +743,8 @@ 3.1.2 ${argLine} -Xmx1024m + + ${project.build.directory}/fork_${surefire.forkNumber}