diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index f6ea467f74c8a..4df1111ea7dcc 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -37,6 +37,7 @@ public enum ThreadName { FLUSH_SERVICE("Flush"), FLUSH_SUB_TASK_SERVICE("Flush-SubTask"), COMPACTION_SERVICE("Compaction"), + COMPACTION_SUB_SERVICE("Sub-Compaction"), COMPACTION_SCHEDULE("Compaction_Schedule"), WAL_DAEMON("WAL-Sync"), WAL_FORCE_DAEMON("WAL-Force"), diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index d6598a5d1437c..d8cf5a2653f5d 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -505,6 +505,12 @@ timestamp_precision=ms # Datatype: int # query_timeout_threshold=60000 +# The number of sub compaction threads to be set up to perform compaction. +# Currently only works for nonAligned data in cross space compaction and unseq inner space compaction. +# Set to 1 when less than or equal to 0. +# Datatype: int +# sub_compaction_thread_num=4 + #################### ### Metadata Cache Configuration #################### diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 580d8f75521d9..987051887cd10 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -436,6 +436,12 @@ public class IoTDBConfig { /** The interval of compaction task submission from queue in CompactionTaskMananger */ private long compactionSubmissionIntervalInMs = 60_000L; + /** + * The number of sub compaction threads to be set up to perform compaction. Currently only works + * for nonAligned data in cross space compaction and unseq inner space compaction. + */ + private int subCompactionTaskNum = 4; + /** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */ private boolean metaDataCacheEnable = true; @@ -2515,6 +2521,14 @@ public void setCompactionSubmissionIntervalInMs(long interval) { compactionSubmissionIntervalInMs = interval; } + public int getSubCompactionTaskNum() { + return subCompactionTaskNum; + } + + public void setSubCompactionTaskNum(int subCompactionTaskNum) { + this.subCompactionTaskNum = subCompactionTaskNum; + } + public String getDeviceIDTransformationMethod() { return deviceIDTransformationMethod; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index e384b5c2b1e4b..d2fc333a18a61 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -374,6 +374,13 @@ private void loadProps() { properties.getProperty( "compaction_priority", conf.getCompactionPriority().toString()))); + int subtaskNum = + Integer.parseInt( + properties.getProperty( + "sub_compaction_thread_num", Integer.toString(conf.getSubCompactionTaskNum()))); + subtaskNum = subtaskNum <= 0 ? 1 : subtaskNum; + conf.setSubCompactionTaskNum(subtaskNum); + conf.setQueryTimeoutThreshold( Integer.parseInt( properties.getProperty( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java index 7d7c9ecb4be87..bf54c18b91324 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; @@ -55,6 +56,10 @@ public class CompactionTaskManager implements IService { // The thread pool that executes the compaction task. The default number of threads for this pool // is 10. private WrappedScheduledExecutorService taskExecutionPool; + + // The thread pool that executes the sub compaction task. + private ScheduledExecutorService subCompactionTaskExecutionPool; + public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0); private FixedPriorityBlockingQueue candidateCompactionTaskQueue = new FixedPriorityBlockingQueue<>(1024, new CompactionTaskComparator()); @@ -86,6 +91,11 @@ public synchronized void start() { IoTDBThreadPoolFactory.newScheduledThreadPool( IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread(), ThreadName.COMPACTION_SERVICE.getName()); + this.subCompactionTaskExecutionPool = + IoTDBThreadPoolFactory.newScheduledThreadPool( + IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() + * IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(), + ThreadName.COMPACTION_SUB_SERVICE.getName()); currentTaskNum = new AtomicInteger(0); compactionTaskSubmissionThreadPool = IoTDBThreadPoolFactory.newScheduledThreadPool(1, ThreadName.COMPACTION_SERVICE.getName()); @@ -310,6 +320,14 @@ public synchronized void submitTask(AbstractCompactionTask compactionTask) : "taskExecutionPool is terminated"); } + public synchronized Future submitSubTask(Callable subCompactionTask) { + if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isTerminated()) { + Future future = subCompactionTaskExecutionPool.submit(subCompactionTask); + return future; + } + return null; + } + /** * Abort all compactions of a storage group. The running compaction tasks will be returned as a * list, the compaction threads for the storage group are not terminated util all the tasks in the diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java index 6e9cf68291859..a4328449cac50 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java @@ -20,6 +20,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.SubCompactionTask; import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator; import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter; import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter; @@ -44,12 +45,14 @@ import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.reader.IBatchReader; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,22 +60,26 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.stream.Collectors; /** * This tool can be used to perform inner space or cross space compaction of aligned and non aligned * timeseries . Currently, we use {@link * org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils} to speed up if it is - * an inner space compaction. + * an seq inner space compaction. */ public class CompactionUtils { private static final Logger logger = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); + private static final int subTaskNum = + IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); public static void compact( List seqFileResources, @@ -108,6 +115,7 @@ public static void compact( } compactionWriter.endFile(); + updateDeviceStartTimeAndEndTime(targetFileResources, compactionWriter); updatePlanIndexes(targetFileResources, seqFileResources, unseqFileResources); } finally { QueryResourceManager.getInstance().endQuery(queryId); @@ -157,9 +165,9 @@ private static void compactAlignedSeries( if (dataBatchReader.hasNextBatch()) { // chunkgroup is serialized only when at least one timeseries under this device has data compactionWriter.startChunkGroup(device, true); - compactionWriter.startMeasurement(measurementSchemas); - writeWithReader(compactionWriter, dataBatchReader); - compactionWriter.endMeasurement(); + compactionWriter.startMeasurement(measurementSchemas, 0); + writeWithReader(compactionWriter, dataBatchReader, 0); + compactionWriter.endMeasurement(0); compactionWriter.endChunkGroup(); } } @@ -170,59 +178,58 @@ private static void compactNonAlignedSeries( AbstractCompactionWriter compactionWriter, QueryContext queryContext, QueryDataSource queryDataSource) - throws MetadataException, IOException { - boolean hasStartChunkGroup = false; + throws IOException, InterruptedException { MultiTsFileDeviceIterator.MeasurementIterator measurementIterator = deviceIterator.iterateNotAlignedSeries(device, false); Set allMeasurements = measurementIterator.getAllMeasurements(); + int subTaskNums = Math.min(allMeasurements.size(), subTaskNum); + + // assign all measurements to different sub tasks + Set[] measurementsForEachSubTask = new HashSet[subTaskNums]; + int idx = 0; for (String measurement : allMeasurements) { - List measurementSchemas = new ArrayList<>(); - try { - if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) { - measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement)); - } else { - measurementSchemas.add( - IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device, measurement))); - } - } catch (PathNotExistException e) { - logger.info("A deleted path is skipped: {}", e.getMessage()); - continue; + if (measurementsForEachSubTask[idx % subTaskNums] == null) { + measurementsForEachSubTask[idx % subTaskNums] = new HashSet(); } + measurementsForEachSubTask[idx++ % subTaskNums].add(measurement); + } - IBatchReader dataBatchReader = - constructReader( - device, - Collections.singletonList(measurement), - measurementSchemas, - allMeasurements, - queryContext, - queryDataSource, - false); + // construct sub tasks and start compacting measurements in parallel + List> futures = new ArrayList<>(); + compactionWriter.startChunkGroup(device, false); + for (int i = 0; i < subTaskNums; i++) { + futures.add( + CompactionTaskManager.getInstance() + .submitSubTask( + new SubCompactionTask( + device, + measurementsForEachSubTask[i], + queryContext, + queryDataSource, + compactionWriter, + i))); + } - if (dataBatchReader.hasNextBatch()) { - if (!hasStartChunkGroup) { - // chunkgroup is serialized only when at least one timeseries under this device has - // data - compactionWriter.startChunkGroup(device, false); - hasStartChunkGroup = true; - } - compactionWriter.startMeasurement(measurementSchemas); - writeWithReader(compactionWriter, dataBatchReader); - compactionWriter.endMeasurement(); + // wait for all sub tasks finish + for (int i = 0; i < subTaskNums; i++) { + try { + futures.get(i).get(); + } catch (InterruptedException | ExecutionException e) { + logger.error("SubCompactionTask meet errors ", e); + Thread.interrupted(); + throw new InterruptedException(); } } - if (hasStartChunkGroup) { - compactionWriter.endChunkGroup(); - } + compactionWriter.endChunkGroup(); } - private static void writeWithReader(AbstractCompactionWriter writer, IBatchReader reader) - throws IOException { + public static void writeWithReader( + AbstractCompactionWriter writer, IBatchReader reader, int subTaskId) throws IOException { while (reader.hasNextBatch()) { BatchData batchData = reader.nextBatch(); while (batchData.hasCurrent()) { - writer.write(batchData.currentTime(), batchData.currentValue()); + writer.write(batchData.currentTime(), batchData.currentValue(), subTaskId); batchData.next(); } } @@ -232,7 +239,7 @@ private static void writeWithReader(AbstractCompactionWriter writer, IBatchReade * @param measurementIds if device is aligned, then measurementIds contain all measurements. If * device is not aligned, then measurementIds only contain one measurement. */ - private static IBatchReader constructReader( + public static IBatchReader constructReader( String deviceId, List measurementIds, List measurementSchemas, @@ -268,6 +275,29 @@ private static AbstractCompactionWriter getCompactionWriter( } } + private static void updateDeviceStartTimeAndEndTime( + List targetResources, AbstractCompactionWriter compactionWriter) { + List targetFileWriters = compactionWriter.getFileIOWriter(); + for (int i = 0; i < targetFileWriters.size(); i++) { + TsFileIOWriter fileIOWriter = targetFileWriters.get(i); + TsFileResource fileResource = targetResources.get(i); + // The tmp target file may does not have any data points written due to the existence of the + // mods file, and it will be deleted after compaction. So skip the target file that has been + // deleted. + if (!fileResource.getTsFile().exists()) { + continue; + } + for (Map.Entry> entry : + fileIOWriter.getDeviceTimeseriesMetadataMap().entrySet()) { + String device = entry.getKey(); + for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) { + fileResource.updateStartTime(device, timeseriesMetadata.getStatistics().getStartTime()); + fileResource.updateEndTime(device, timeseriesMetadata.getStatistics().getEndTime()); + } + } + } + } + private static void updatePlanIndexes( List targetResources, List seqResources, @@ -280,7 +310,7 @@ private static void updatePlanIndexes( // in the new file for (int i = 0; i < targetResources.size(); i++) { TsFileResource targetResource = targetResources.get(i); - // remove the target file been deleted from list + // remove the target file that has been deleted from list if (!targetResource.getTsFile().exists()) { targetResources.remove(i--); continue; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java new file mode 100644 index 0000000000000..315122e3cf647 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/SubCompactionTask.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.compaction.cross.rewrite.task; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionUtils; +import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.db.metadata.idtable.IDTableManager; +import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.tsfile.read.reader.IBatchReader; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; + +/** + * This class is used to implement reading the measurements and writing to the target files in + * parallel in the compaction. Currently, it only works for nonAligned data in cross space + * compaction and unseq inner space compaction. + */ +public class SubCompactionTask implements Callable { + private static final Logger logger = + LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); + private final String device; + private final Set measurementList; + private final QueryContext queryContext; + private final QueryDataSource queryDataSource; + private final AbstractCompactionWriter compactionWriter; + private final int taskId; + + public SubCompactionTask( + String device, + Set measurementList, + QueryContext queryContext, + QueryDataSource queryDataSource, + AbstractCompactionWriter compactionWriter, + int taskId) { + this.device = device; + this.measurementList = measurementList; + this.queryContext = queryContext; + this.queryDataSource = queryDataSource; + this.compactionWriter = compactionWriter; + this.taskId = taskId; + } + + @Override + public Void call() throws Exception { + for (String measurement : measurementList) { + List measurementSchemas = new ArrayList<>(); + try { + if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) { + measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement)); + } else { + measurementSchemas.add( + IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device, measurement))); + } + } catch (PathNotExistException e) { + logger.info("A deleted path is skipped: {}", e.getMessage()); + continue; + } + + IBatchReader dataBatchReader = + CompactionUtils.constructReader( + device, + Collections.singletonList(measurement), + measurementSchemas, + measurementList, + queryContext, + queryDataSource, + false); + + if (dataBatchReader.hasNextBatch()) { + compactionWriter.startMeasurement(measurementSchemas, taskId); + CompactionUtils.writeWithReader(compactionWriter, dataBatchReader, taskId); + compactionWriter.endMeasurement(taskId); + } + } + return null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java index 053631a516a32..5c1460230dc0a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java @@ -23,7 +23,6 @@ import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType; -import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; @@ -38,8 +37,12 @@ import java.util.List; public abstract class AbstractCompactionWriter implements AutoCloseable { + protected static final int subTaskNum = + IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); - protected IChunkWriter chunkWriter; + // Each sub task has its own chunk writer. + // The index of the array corresponds to subTaskId. + protected IChunkWriter[] chunkWriters = new IChunkWriter[subTaskNum]; protected boolean isAlign; @@ -49,25 +52,26 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { private final boolean enableMetrics = MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric(); - // point count in current measurment, which is used to check size - private int measurementPointCount; + // Each sub task has point count in current measurment, which is used to check size. + // The index of the array corresponds to subTaskId. + protected int[] measurementPointCountArray = new int[subTaskNum]; public abstract void startChunkGroup(String deviceId, boolean isAlign) throws IOException; public abstract void endChunkGroup() throws IOException; - public void startMeasurement(List measurementSchemaList) { - measurementPointCount = 0; + public void startMeasurement(List measurementSchemaList, int subTaskId) { + measurementPointCountArray[subTaskId] = 0; if (isAlign) { - chunkWriter = new AlignedChunkWriterImpl(measurementSchemaList); + chunkWriters[subTaskId] = new AlignedChunkWriterImpl(measurementSchemaList); } else { - chunkWriter = new ChunkWriterImpl(measurementSchemaList.get(0), true); + chunkWriters[subTaskId] = new ChunkWriterImpl(measurementSchemaList.get(0), true); } } - public abstract void endMeasurement() throws IOException; + public abstract void endMeasurement(int subTaskId) throws IOException; - public abstract void write(long timestamp, Object value) throws IOException; + public abstract void write(long timestamp, Object value, int subTaskId) throws IOException; public abstract void write(long[] timestamps, Object values); @@ -75,9 +79,9 @@ public void startMeasurement(List measurementSchemaList) { public abstract void close() throws IOException; - protected void writeDataPoint(Long timestamp, Object value) { + protected void writeDataPoint(Long timestamp, Object value, int subTaskId) { if (!isAlign) { - ChunkWriterImpl chunkWriter = (ChunkWriterImpl) this.chunkWriter; + ChunkWriterImpl chunkWriter = (ChunkWriterImpl) this.chunkWriters[subTaskId]; switch (chunkWriter.getDataType()) { case TEXT: chunkWriter.write(timestamp, (Binary) value); @@ -101,7 +105,7 @@ protected void writeDataPoint(Long timestamp, Object value) { throw new UnsupportedOperationException("Unknown data type " + chunkWriter.getDataType()); } } else { - AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriter; + AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId]; for (TsPrimitiveType val : (TsPrimitiveType[]) value) { if (val == null) { chunkWriter.write(timestamp, null, true); @@ -133,28 +137,37 @@ protected void writeDataPoint(Long timestamp, Object value) { } chunkWriter.write(timestamp); } - measurementPointCount++; + measurementPointCountArray[subTaskId] += 1; } - protected void checkChunkSizeAndMayOpenANewChunk(TsFileIOWriter fileWriter) throws IOException { - if (measurementPointCount % 10 == 0 && checkChunkSize()) { - writeRateLimit(chunkWriter.estimateMaxSeriesMemSize()); + protected void flushChunkToFileWriter(TsFileIOWriter targetWriter, int subTaskId) + throws IOException { + writeRateLimit(chunkWriters[subTaskId].estimateMaxSeriesMemSize()); + synchronized (targetWriter) { + chunkWriters[subTaskId].writeToFileWriter(targetWriter); + } + } + + protected void checkChunkSizeAndMayOpenANewChunk(TsFileIOWriter fileWriter, int subTaskId) + throws IOException { + if (measurementPointCountArray[subTaskId] % 10 == 0 && checkChunkSize(subTaskId)) { + flushChunkToFileWriter(fileWriter, subTaskId); CompactionMetricsManager.recordWriteInfo( this instanceof CrossSpaceCompactionWriter ? CompactionType.CROSS_COMPACTION : CompactionType.INNER_UNSEQ_COMPACTION, ProcessChunkType.DESERIALIZE_CHUNK, this.isAlign, - chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(fileWriter); + chunkWriters[subTaskId].estimateMaxSeriesMemSize()); } } - private boolean checkChunkSize() { - if (chunkWriter instanceof AlignedChunkWriterImpl) { - return ((AlignedChunkWriterImpl) chunkWriter).checkIsChunkSizeOverThreshold(targetChunkSize); + protected boolean checkChunkSize(int subTaskId) { + if (chunkWriters[subTaskId] instanceof AlignedChunkWriterImpl) { + return ((AlignedChunkWriterImpl) chunkWriters[subTaskId]) + .checkIsChunkSizeOverThreshold(targetChunkSize); } else { - return chunkWriter.estimateMaxSeriesMemSize() > targetChunkSize; + return chunkWriters[subTaskId].estimateMaxSeriesMemSize() > targetChunkSize; } } @@ -163,8 +176,5 @@ protected void writeRateLimit(long bytesLength) { CompactionTaskManager.getInstance().getMergeWriteRateLimiter(), bytesLength); } - protected void updateDeviceStartAndEndTime(TsFileResource targetResource, long timestamp) { - targetResource.updateStartTime(deviceId, timestamp); - targetResource.updateEndTime(deviceId, timestamp); - } + public abstract List getFileIOWriter(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java index 2ebe22581d7ce..3e245cfc35a13 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java @@ -21,7 +21,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; -import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.IOException; @@ -36,68 +35,73 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter { // source tsfiles private List seqTsFileResources; - private int seqFileIndex; + // Each sub task has its corresponding seq file index. + // The index of the array corresponds to subTaskId. + private int[] seqFileIndexArray = new int[subTaskNum]; + // device end time in each source seq file private final long[] currentDeviceEndTime; + // whether each target file is empty or not private final boolean[] isEmptyFile; - private final boolean[] hasTargetFileStartChunkGroup; + // whether each target file has device data or not + private final boolean[] isDeviceExistedInTargetFiles; - private final List targetTsFileResources; + // current chunk group header size + private int chunkGroupHeaderSize; public CrossSpaceCompactionWriter( List targetResources, List seqFileResources) throws IOException { currentDeviceEndTime = new long[seqFileResources.size()]; isEmptyFile = new boolean[seqFileResources.size()]; - hasTargetFileStartChunkGroup = new boolean[seqFileResources.size()]; + isDeviceExistedInTargetFiles = new boolean[targetResources.size()]; for (int i = 0; i < targetResources.size(); i++) { - this.fileWriterList.add(new RestorableTsFileIOWriter(targetResources.get(i).getTsFile())); + this.fileWriterList.add(new TsFileIOWriter(targetResources.get(i).getTsFile())); isEmptyFile[i] = true; } this.seqTsFileResources = seqFileResources; - this.targetTsFileResources = targetResources; - seqFileIndex = 0; } @Override public void startChunkGroup(String deviceId, boolean isAlign) throws IOException { this.deviceId = deviceId; this.isAlign = isAlign; - this.seqFileIndex = 0; + this.seqFileIndexArray = new int[subTaskNum]; checkIsDeviceExistAndGetDeviceEndTime(); - for (int i = 0; i < seqTsFileResources.size(); i++) { - hasTargetFileStartChunkGroup[i] = false; + for (int i = 0; i < fileWriterList.size(); i++) { + chunkGroupHeaderSize = fileWriterList.get(i).startChunkGroup(deviceId); } } @Override public void endChunkGroup() throws IOException { for (int i = 0; i < seqTsFileResources.size(); i++) { - if (hasTargetFileStartChunkGroup[i]) { - fileWriterList.get(i).endChunkGroup(); + TsFileIOWriter targetFileWriter = fileWriterList.get(i); + if (isDeviceExistedInTargetFiles[i]) { + targetFileWriter.endChunkGroup(); + } else { + targetFileWriter.truncate(targetFileWriter.getPos() - chunkGroupHeaderSize); } + isDeviceExistedInTargetFiles[i] = false; } deviceId = null; } @Override - public void endMeasurement() throws IOException { - writeRateLimit(chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(fileWriterList.get(seqFileIndex)); - chunkWriter = null; - seqFileIndex = 0; + public void endMeasurement(int subTaskId) throws IOException { + flushChunkToFileWriter(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId); + seqFileIndexArray[subTaskId] = 0; } @Override - public void write(long timestamp, Object value) throws IOException { - checkTimeAndMayFlushChunkToCurrentFile(timestamp); - checkAndMayStartChunkGroup(); - writeDataPoint(timestamp, value); - updateDeviceStartAndEndTime(targetTsFileResources.get(seqFileIndex), timestamp); - checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndex)); - isEmptyFile[seqFileIndex] = false; + public void write(long timestamp, Object value, int subTaskId) throws IOException { + checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId); + writeDataPoint(timestamp, value, subTaskId); + checkChunkSizeAndMayOpenANewChunk(fileWriterList.get(seqFileIndexArray[subTaskId]), subTaskId); + isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true; + isEmptyFile[seqFileIndexArray[subTaskId]] = false; } @Override @@ -123,16 +127,21 @@ public void close() throws IOException { } fileWriterList = null; seqTsFileResources = null; - chunkWriter = null; } - private void checkTimeAndMayFlushChunkToCurrentFile(long timestamp) throws IOException { + @Override + public List getFileIOWriter() { + return fileWriterList; + } + + private void checkTimeAndMayFlushChunkToCurrentFile(long timestamp, int subTaskId) + throws IOException { + int fileIndex = seqFileIndexArray[subTaskId]; // if timestamp is later than the current source seq tsfile, than flush chunk writer - while (timestamp > currentDeviceEndTime[seqFileIndex]) { - if (seqFileIndex != seqTsFileResources.size() - 1) { - writeRateLimit(chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(fileWriterList.get(seqFileIndex)); - seqFileIndex++; + while (timestamp > currentDeviceEndTime[fileIndex]) { + if (fileIndex != seqTsFileResources.size() - 1) { + flushChunkToFileWriter(fileWriterList.get(fileIndex), subTaskId); + seqFileIndexArray[subTaskId] = ++fileIndex; } else { // If the seq file is deleted for various reasons, the following two situations may occur // when selecting the source files: (1) unseq files may have some devices or measurements @@ -168,11 +177,4 @@ private void checkIsDeviceExistAndGetDeviceEndTime() throws IOException { fileIndex++; } } - - private void checkAndMayStartChunkGroup() throws IOException { - if (!hasTargetFileStartChunkGroup[seqFileIndex]) { - fileWriterList.get(seqFileIndex).startChunkGroup(deviceId); - hasTargetFileStartChunkGroup[seqFileIndex] = true; - } - } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java index 7b0e31095dfd3..af2cc53c67ed4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java @@ -19,22 +19,20 @@ package org.apache.iotdb.db.engine.compaction.writer; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.IOException; +import java.util.Collections; +import java.util.List; public class InnerSpaceCompactionWriter extends AbstractCompactionWriter { private TsFileIOWriter fileWriter; private boolean isEmptyFile; - private final TsFileResource targetTsFileResource; - public InnerSpaceCompactionWriter(TsFileResource targetFileResource) throws IOException { - fileWriter = new RestorableTsFileIOWriter(targetFileResource.getTsFile()); + this.fileWriter = new TsFileIOWriter(targetFileResource.getTsFile()); isEmptyFile = true; - this.targetTsFileResource = targetFileResource; } @Override @@ -50,17 +48,14 @@ public void endChunkGroup() throws IOException { } @Override - public void endMeasurement() throws IOException { - writeRateLimit(chunkWriter.estimateMaxSeriesMemSize()); - chunkWriter.writeToFileWriter(fileWriter); - chunkWriter = null; + public void endMeasurement(int subTaskId) throws IOException { + flushChunkToFileWriter(fileWriter, subTaskId); } @Override - public void write(long timestamp, Object value) throws IOException { - writeDataPoint(timestamp, value); - updateDeviceStartAndEndTime(targetTsFileResource, timestamp); - checkChunkSizeAndMayOpenANewChunk(fileWriter); + public void write(long timestamp, Object value, int subTaskId) throws IOException { + writeDataPoint(timestamp, value, subTaskId); + checkChunkSizeAndMayOpenANewChunk(fileWriter, subTaskId); isEmptyFile = false; } @@ -80,7 +75,11 @@ public void close() throws IOException { if (fileWriter != null && fileWriter.canWrite()) { fileWriter.close(); } - chunkWriter = null; fileWriter = null; } + + @Override + public List getFileIOWriter() { + return Collections.singletonList(fileWriter); + } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java index a6aab11e1c625..69ff88dee05a0 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java @@ -30,9 +30,14 @@ import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.reader.IBatchReader; import org.apache.iotdb.tsfile.utils.Pair; @@ -202,6 +207,20 @@ public void testSeqInnerSpaceCompactionWithDifferentTimeseries() CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true); CompactionUtils.compact(seqResources, unseqResources, targetResources); CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG); + assertEquals( + 0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + assertEquals( + 0, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + assertEquals( + 250, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + assertEquals( + 600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + assertEquals( + 600, targetResources.get(0).getStartTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4")); + for (int i = 0; i < 5; i++) { + assertEquals( + 749, targetResources.get(0).getEndTime(COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + i)); + } for (int i = 0; i < 5; i++) { for (int j = 0; j < 5; j++) { @@ -1752,7 +1771,7 @@ public void testCrossSpaceCompactionWithSameTimeseries() } /** - * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries. + * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries. * *

Seq files
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range @@ -1836,6 +1855,34 @@ public void testCrossSpaceCompactionWithDifferentTimeseries() CompactionUtils.compact(seqResources, unseqResources, targetResources); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); + List deviceIdList = new ArrayList<>(); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"); + for (int i = 0; i < 2; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"); + for (int i = 2; i < 4; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + Map measurementMaxTime = new HashMap<>(); for (int i = 0; i < 4; i++) { @@ -1905,7 +1952,7 @@ public void testCrossSpaceCompactionWithDifferentTimeseries() } /** - * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries. + * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries. * *

Seq files
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range @@ -2002,6 +2049,34 @@ public void testCrossSpaceCompactionWithAllDataDeletedInTimeseries() CompactionUtils.compact(seqResources, unseqResources, targetResources); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); + List deviceIdList = new ArrayList<>(); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"); + for (int i = 0; i < 2; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"); + for (int i = 2; i < 4; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + Map measurementMaxTime = new HashMap<>(); for (int i = 0; i < 4; i++) { for (int j = 0; j < 5; j++) { @@ -2072,7 +2147,7 @@ public void testCrossSpaceCompactionWithAllDataDeletedInTimeseries() } /** - * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries. + * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries. * *

Seq files
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range @@ -2165,6 +2240,32 @@ public void testCrossSpaceCompactionWithAllDataDeletedInDevice() CompactionUtils.compact(seqResources, unseqResources, targetResources); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); + List deviceIdList = new ArrayList<>(); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"); + for (int i = 0; i < 2; i++) { + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"); + for (int i = 2; i < 4; i++) { + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + Map measurementMaxTime = new HashMap<>(); for (int i = 0; i < 4; i++) { for (int j = 0; j < 5; j++) { @@ -2231,7 +2332,7 @@ public void testCrossSpaceCompactionWithAllDataDeletedInDevice() } /** - * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries. + * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries. * *

Seq files
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range @@ -2245,8 +2346,8 @@ public void testCrossSpaceCompactionWithAllDataDeletedInDevice() * forth and fifth file has d0 and s0 ~ s4, time range is 450 ~ 549 and 550 ~ 649, value range is * 20450 ~ 20549 and 20550 ~ 20649. * - *

The data of d0, d1 and d2 is deleted in each file. Data in the first target file is all - * deleted. + *

The data of d0, d1 and d2 is deleted in each file. Data in the first and second target file + * is all deleted. */ @Test public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() @@ -2320,6 +2421,21 @@ public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() CompactionUtils.compact(seqResources, unseqResources, targetResources); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); + Assert.assertEquals(2, targetResources.size()); + List deviceIdList = new ArrayList<>(); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"); + for (int i = 0; i < 2; i++) { + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + Map measurementMaxTime = new HashMap<>(); for (int i = 0; i < 4; i++) { for (int j = 0; j < 5; j++) { @@ -2380,7 +2496,7 @@ public void testCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() } /** - * Total 5 seq files and 5 unseq files, each file has different nonAligned timeseries. + * Total 4 seq files and 5 unseq files, each file has different nonAligned timeseries. * *

Seq files
* first and second file has d0 ~ d1 and s0 ~ s2, time range is 0 ~ 299 and 350 ~ 649, value range @@ -2477,6 +2593,48 @@ public void testCrossSpaceCompactionWithAllDataDeletedInDeviceInSeqFiles() CompactionUtils.compact(seqResources, unseqResources, targetResources); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); + Assert.assertEquals(4, targetResources.size()); + List deviceIdList = new ArrayList<>(); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"); + for (int i = 0; i < 2; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"); + for (int i = 2; i < 3; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + deviceIdList.clear(); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"); + for (int i = 3; i < 4; i++) { + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + Map measurementMaxTime = new HashMap<>(); for (int i = 0; i < 4; i++) { for (int j = 0; j < 5; j++) { @@ -2933,6 +3091,35 @@ public void testAlignedCrossSpaceCompactionWithAllDataDeletedInTimeseries() CompactionUtils.compact(seqResources, unseqResources, targetResources); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); + Assert.assertEquals(4, targetResources.size()); + List deviceIdList = new ArrayList<>(); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001"); + for (int i = 0; i < 2; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003")); + check(targetResources.get(i), deviceIdList); + } + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003"); + for (int i = 2; i < 4; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10000")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10001")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10002")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d10003")); + check(targetResources.get(i), deviceIdList); + } + for (int i = TsFileGeneratorUtils.getAlignDeviceOffset(); i < TsFileGeneratorUtils.getAlignDeviceOffset() + 4; i++) { @@ -3433,6 +3620,54 @@ public void testCrossSpaceCompactionWithNewDeviceInUnseqFile() { CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); CompactionUtils.compact(seqResources, unseqResources, targetResources); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); + + Assert.assertEquals(4, targetResources.size()); + List deviceIdList = new ArrayList<>(); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"); + for (int i = 0; i < 2; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"); + for (int i = 2; i < 3; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d5"); + for (int i = 3; i < 4; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d4")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d5")); + check(targetResources.get(i), deviceIdList); + } } catch (MetadataException | IOException | WriteProcessException @@ -3456,6 +3691,35 @@ public void testCrossSpaceCompactionWithDeviceMaxTimeLaterInUnseqFile() { CompactionUtils.compact(seqResources, unseqResources, targetResources); CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG); + Assert.assertEquals(2, targetResources.size()); + List deviceIdList = new ArrayList<>(); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1"); + for (int i = 0; i < 1; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertFalse( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2"); + deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3"); + for (int i = 1; i < 2; i++) { + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d0")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d1")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d2")); + Assert.assertTrue( + targetResources.get(i).isDeviceIdExist(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3")); + check(targetResources.get(i), deviceIdList); + } + for (int i = 0; i < 4; i++) { for (int j = 0; j < 4; j++) { PartialPath path = @@ -3515,4 +3779,47 @@ private void generateModsFile( CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false); } } + + /** + * Check whether target file contain empty chunk group or not. Assert fail if it contains empty + * chunk group whose deviceID is not in the deviceIdList. + */ + public void check(TsFileResource targetResource, List deviceIdList) throws IOException { + byte marker; + try (TsFileSequenceReader reader = + new TsFileSequenceReader(targetResource.getTsFile().getAbsolutePath())) { + reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1); + while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { + switch (marker) { + case MetaMarker.CHUNK_HEADER: + case MetaMarker.TIME_CHUNK_HEADER: + case MetaMarker.VALUE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: + ChunkHeader header = reader.readChunkHeader(marker); + int dataSize = header.getDataSize(); + reader.position(reader.position() + dataSize); + break; + case MetaMarker.CHUNK_GROUP_HEADER: + ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); + String deviceID = chunkGroupHeader.getDeviceID(); + if (!deviceIdList.contains(deviceID)) { + Assert.fail( + "Target file " + + targetResource.getTsFile().getPath() + + " contains empty chunk group " + + deviceID); + } + break; + case MetaMarker.OPERATION_INDEX_RANGE: + reader.readPlanIndex(); + break; + default: + // the disk file is corrupted, using this file may be dangerous + throw new IOException("Unexpected marker " + marker); + } + } + } + } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java index a413419f32576..5e0824911e72c 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java @@ -21,6 +21,7 @@ import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; +import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.cross.rewrite.manage.CrossSpaceCompactionResource; import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.ICrossSpaceMergeFileSelector; import org.apache.iotdb.db.engine.compaction.cross.rewrite.selector.RewriteCompactionFileSelector; @@ -108,6 +109,7 @@ public void setUp() throws MetadataException { TSFileDescriptor.getInstance().getConfig().getCompressor(), Collections.emptyMap()); } + CompactionTaskManager.getInstance().start(); Thread.currentThread().setName("pool-1-IoTDB-Compaction-1"); } @@ -118,6 +120,7 @@ public void tearDown() throws IOException, StorageEngineException { ChunkCache.getInstance().clear(); TimeSeriesMetadataCache.getInstance().clear(); IoTDB.configManager.clear(); + CompactionTaskManager.getInstance().stop(); EnvironmentUtils.cleanAllDir(); Thread.currentThread().setName(oldThreadName); new CompactionConfigRestorer().restoreCompactionConfig(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 7eb9a95110778..2f865f297f081 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -142,14 +142,14 @@ protected void startFile() throws IOException { out.write(VERSION_NUMBER_BYTE); } - public void startChunkGroup(String deviceId) throws IOException { + public int startChunkGroup(String deviceId) throws IOException { this.currentChunkGroupDeviceId = deviceId; if (logger.isDebugEnabled()) { logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition()); } chunkMetadataList = new ArrayList<>(); ChunkGroupHeader chunkGroupHeader = new ChunkGroupHeader(currentChunkGroupDeviceId); - chunkGroupHeader.serializeTo(out.wrapAsStream()); + return chunkGroupHeader.serializeTo(out.wrapAsStream()); } /** @@ -458,6 +458,10 @@ public void writePlanIndices() throws IOException { out.flush(); } + public void truncate(long offset) throws IOException { + out.truncate(offset); + } + /** * this function is only for Test. *