diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index 9fb8dfc62d9bd..4518cb4c56d06 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -249,7 +249,8 @@ protected boolean doCompaction() { "{}-{} [Compaction] compaction finish, start to delete old files", storageGroupName, dataRegionId); - CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics(selectedTsFileResourceList); + CompactionUtils.deleteSourceTsFileAndUpdateFileMetrics( + selectedTsFileResourceList, sequence); CompactionUtils.deleteModificationForSourceFile( selectedTsFileResourceList, storageGroupName + "-" + dataRegionId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index ea7d41b27d17e..ae76fc1a0cf24 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -439,54 +440,30 @@ public static boolean validateSingleTsFiles(TsFileResource resource) { public static void deleteSourceTsFileAndUpdateFileMetrics( List sourceSeqResourceList, List sourceUnseqResourceList) { - // delete seq file - deleteSourceTsFileAndUpdateFileMetrics(sourceSeqResourceList); - - // delete unSeq file - long[] unSequenceFileSize = new long[sourceUnseqResourceList.size()]; - List unSequenceFileNames = new ArrayList<>(); - boolean removeSuccess = true; - for (int i = 0; i < sourceUnseqResourceList.size(); i++) { - TsFileResource tsFileResource = sourceUnseqResourceList.get(i); - if (!tsFileResource.remove()) { - removeSuccess = false; - logger.warn( - "[Compaction] delete unSequence file failed,file path is {}", - tsFileResource.getTsFile().getAbsolutePath()); - } else { - logger.info( - "[Compaction] delete unSequence file :{}", - tsFileResource.getTsFile().getAbsolutePath()); - unSequenceFileSize[i] = tsFileResource.getTsFileSize(); - unSequenceFileNames.add(tsFileResource.getTsFile().getName()); - } - } - if (removeSuccess) { - FileMetrics.getInstance().deleteFile(unSequenceFileSize, false, unSequenceFileNames); - } + deleteSourceTsFileAndUpdateFileMetrics(sourceSeqResourceList, true); + deleteSourceTsFileAndUpdateFileMetrics(sourceUnseqResourceList, false); } public static void deleteSourceTsFileAndUpdateFileMetrics( - List sourceSeqResourceList) { - long[] sequenceFileSize = new long[sourceSeqResourceList.size()]; - List sequenceFileNames = new ArrayList<>(); - boolean removeSuccess = true; - for (int i = 0; i < sourceSeqResourceList.size(); i++) { - TsFileResource tsFileResource = sourceSeqResourceList.get(i); - if (!tsFileResource.remove()) { - removeSuccess = false; + List resources, boolean seq) { + long[] fileSizes = new long[resources.size()]; + List fileNames = new ArrayList<>(resources.size()); + int removeSuccessFileNum = 0; + for (TsFileResource resource : resources) { + if (!resource.remove()) { logger.warn( - "[Compaction] delete sequence file failed,file path is {}", - tsFileResource.getTsFile().getAbsolutePath()); + "[Compaction] delete file failed, file path is {}", + resource.getTsFile().getAbsolutePath()); } else { - logger.info( - "[Compaction] delete sequence file :{}", tsFileResource.getTsFile().getAbsolutePath()); - sequenceFileSize[i] = tsFileResource.getTsFileSize(); - sequenceFileNames.add(tsFileResource.getTsFile().getName()); + logger.info("[Compaction] delete file: {}", resource.getTsFile().getAbsolutePath()); + fileSizes[removeSuccessFileNum] = resource.getTsFileSize(); + fileNames.add(resource.getTsFile().getName()); + removeSuccessFileNum++; } } - if (removeSuccess) { - FileMetrics.getInstance().deleteFile(sequenceFileSize, true, sequenceFileNames); + if (removeSuccessFileNum != 0) { + fileSizes = Arrays.copyOfRange(fileSizes, 0, removeSuccessFileNum); + FileMetrics.getInstance().deleteFile(fileSizes, seq, fileNames); } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionUpdateFileCountTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionUpdateFileCountTest.java new file mode 100644 index 0000000000000..857313125b6b3 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionUpdateFileCountTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.utils; + +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.service.metrics.FileMetrics; +import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class CompactionUpdateFileCountTest extends AbstractCompactionTest { + + @Before + public void setUp() + throws IOException, WriteProcessException, MetadataException, InterruptedException { + super.setUp(); + } + + @After + public void tearDown() throws IOException, StorageEngineException { + super.tearDown(); + } + + @Test + public void testSeqSpaceCompactionFileMetric() + throws MetadataException, IOException, WriteProcessException { + registerTimeseriesInMManger(2, 3, false); + long initSeqFileNum = FileMetrics.getInstance().getFileNum(true); + long initUnSeqFileNum = FileMetrics.getInstance().getFileNum(false); + createFiles(1, 2, 3, 100, 1, 0, 50, 0, false, true); + createFiles(1, 2, 3, 50, 20, 30000, 50, 50, false, true); + tsFileManager.addAll(seqResources, true); + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, + tsFileManager, + seqResources, + true, + new ReadChunkCompactionPerformer(), + new AtomicInteger(), + 0); + Assert.assertTrue(task.start()); + Assert.assertEquals(initSeqFileNum - 1, FileMetrics.getInstance().getFileNum(true)); + Assert.assertEquals(initUnSeqFileNum, FileMetrics.getInstance().getFileNum(false)); + } + + @Test + public void testUnSeqSpaceCompactionFileMetric() + throws MetadataException, IOException, WriteProcessException { + registerTimeseriesInMManger(2, 3, false); + long initSeqFileNum = FileMetrics.getInstance().getFileNum(true); + long initUnSeqFileNum = FileMetrics.getInstance().getFileNum(false); + createFiles(1, 2, 3, 100, 1, 0, 50, 0, false, false); + createFiles(1, 2, 3, 50, 20, 10000, 50, 50, false, false); + tsFileManager.addAll(unseqResources, false); + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, + tsFileManager, + unseqResources, + false, + new FastCompactionPerformer(false), + new AtomicInteger(), + 0); + Assert.assertTrue(task.start()); + Assert.assertEquals(initSeqFileNum, FileMetrics.getInstance().getFileNum(true)); + Assert.assertEquals(initUnSeqFileNum - 1, FileMetrics.getInstance().getFileNum(false)); + } + + @Test + public void testCrossSpaceCompactionFileMetric() + throws MetadataException, IOException, WriteProcessException { + registerTimeseriesInMManger(2, 3, false); + long initSeqFileNum = FileMetrics.getInstance().getFileNum(true); + long initUnSeqFileNum = FileMetrics.getInstance().getFileNum(false); + createFiles(1, 2, 3, 100, 1, 0, 50, 0, false, true); + createFiles(3, 2, 3, 50, 20, 10000, 50, 50, false, false); + tsFileManager.addAll(seqResources, true); + tsFileManager.addAll(unseqResources, false); + CrossSpaceCompactionTask task = + new CrossSpaceCompactionTask( + 0, + tsFileManager, + seqResources, + unseqResources, + new FastCompactionPerformer(true), + new AtomicInteger(0), + 0, + 0); + Assert.assertTrue(task.start()); + Assert.assertEquals(initSeqFileNum, FileMetrics.getInstance().getFileNum(true)); + Assert.assertEquals(initUnSeqFileNum - 3, FileMetrics.getInstance().getFileNum(false)); + } +}