Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/UserGuide/Reference/Common-Config-Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,14 @@ Different configuration parameters take effect in the following three ways:
|Default| true |
|Effective| After restart system |

* candidate\_compaction\_task\_queue\_size

|Name| candidate\_compaction\_task\_queue\_size |
|:---:|:--------------------------------------------|
|Description| The size of candidate compaction task queue |
|Type| Int32 |
|Default| 50 |
|Effective| After restart system |

### Write Ahead Log Configuration

Expand Down
31 changes: 20 additions & 11 deletions docs/zh/UserGuide/Reference/Common-Config-Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -1056,21 +1056,30 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。

* sub\_compaction\_thread\_count

|名字| sub\_compaction\_thread\_count |
|:---:|:--|
|描述| 每个跨空间合并任务的子任务线程数 |
|类型| Int32 |
|默认值| 4 |
|改后生效方式| 重启服务生效|
|名字| sub\_compaction\_thread\_count |
|:---:|:--------------------------------|
|描述| 每个合并任务的子任务线程数,只对跨空间合并和乱序空间内合并生效 |
|类型| int32 |
|默认值| 4 |
|改后生效方式| 重启服务生效 |

* enable\_compaction\_validation

|名字| enable\_compaction\_validation |
|:---:|:--|
|描述| 开启合并结束后对顺序文件时间范围的检查 |
|类型| Boolean |
|默认值| true |
|改后生效方式| 重启服务生效|
|:---:|:-------------------------------|
|描述| 开启合并结束后对顺序文件时间范围的检查 |
|类型| Boolean |
|默认值| true |
|改后生效方式| 重启服务生效 |

* candidate\_compaction\_task\_queue\_size

|名字| candidate\_compaction\_task\_queue\_size |
|:---:|:-----------------------------------------|
|描述| 合并任务优先级队列的大小 |
|类型| int32 |
|默认值| 50 |
|改后生效方式| 重启服务生效 |

### 写前日志配置

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,10 @@ cluster_name=defaultCluster
# BALANCE: alternate two compaction types
# compaction_priority=BALANCE

# The size of candidate compaction task queue.
# Datatype: int
# candidate_compaction_task_queue_size = 50

# The target tsfile size in compaction
# Datatype: long, Unit: byte
# target_compaction_file_size=1073741824
Expand Down Expand Up @@ -726,9 +730,9 @@ cluster_name=defaultCluster
# Datatype: int
# page_size_in_byte=65536

# The maximum number of data points in a page, default 1024*1024
# The maximum number of data points in a page, default 10000
# Datatype: int
# max_number_of_points_in_page=1048576
# max_number_of_points_in_page=10000

# The threshold for pattern matching in regex
# Datatype: int
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ public class IoTDBConfig {

private boolean enableCompactionValidation = true;

/** The size of candidate compaction task queue. */
private int candidateCompactionTaskQueueSize = 50;

/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
private boolean metaDataCacheEnable = true;

Expand Down Expand Up @@ -3657,6 +3660,14 @@ public void setEnableCompactionValidation(boolean enableCompactionValidation) {
this.enableCompactionValidation = enableCompactionValidation;
}

public int getCandidateCompactionTaskQueueSize() {
return candidateCompactionTaskQueueSize;
}

public void setCandidateCompactionTaskQueueSize(int candidateCompactionTaskQueueSize) {
this.candidateCompactionTaskQueueSize = candidateCompactionTaskQueueSize;
}

public boolean isEnableAuditLog() {
return enableAuditLog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,11 @@ public void loadProperties(Properties properties) {
properties.getProperty(
"enable_compaction_validation",
Boolean.toString(conf.isEnableCompactionValidation()))));
conf.setCandidateCompactionTaskQueueSize(
Integer.parseInt(
properties.getProperty(
"candidate_compaction_task_queue_size",
Integer.toString(conf.getCandidateCompactionTaskQueueSize()))));

conf.setEnablePartialInsert(
Boolean.parseBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ protected void doCompaction() {
// get resource of target file
String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
LOGGER.info(
"{}-{} [Compaction] InnerSpaceCompaction task starts with {} files",
"{}-{} [Compaction] {} InnerSpaceCompaction task starts with {} files, total file size is {} MB.",
storageGroupName,
dataRegionId,
selectedTsFileResourceList.size());
sequence ? "Sequence" : "Unsequence",
selectedTsFileResourceList.size(),
selectedFileSize / 1024 / 1024);
try {
targetTsFileResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(
Expand Down Expand Up @@ -252,10 +254,11 @@ protected void doCompaction() {

double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
LOGGER.info(
"{}-{} [Compaction] InnerSpaceCompaction task finishes successfully, target file is {},"
"{}-{} [Compaction] {} InnerSpaceCompaction task finishes successfully, target file is {},"
+ "time cost is {} s, compaction speed is {} MB/s, {}",
storageGroupName,
dataRegionId,
sequence ? "Sequence" : "Unsequence",
targetTsFileResource.getTsFile().getName(),
costTime,
selectedFileSize / 1024.0d / 1024.0d / costTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/**
* CompactionScheduler schedules and submits the compaction task periodically, and it counts the
Expand All @@ -56,40 +58,22 @@ public static void scheduleCompaction(TsFileManager tsFileManager, long timePart
return;
}
try {
tryToSubmitCrossSpaceCompactionTask(
tsFileManager.getStorageGroupName(),
tsFileManager.getDataRegionId(),
timePartition,
tsFileManager);
tryToSubmitInnerSpaceCompactionTask(
tsFileManager.getStorageGroupName(),
tsFileManager.getDataRegionId(),
timePartition,
tsFileManager,
true);
tryToSubmitInnerSpaceCompactionTask(
tsFileManager.getStorageGroupName(),
tsFileManager.getDataRegionId(),
timePartition,
tsFileManager,
false);
tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition);
} catch (InterruptedException e) {
LOGGER.error("Exception occurs when selecting compaction tasks", e);
Thread.currentThread().interrupt();
}
}

public static void tryToSubmitInnerSpaceCompactionTask(
String storageGroupName,
String dataRegionId,
long timePartition,
TsFileManager tsFileManager,
boolean sequence)
throws InterruptedException {
private static List<List<TsFileResource>> selectInnerSpaceCompactionTask(
long timePartition, TsFileManager tsFileManager, boolean sequence) {
if ((!config.isEnableSeqSpaceCompaction() && sequence)
|| (!config.isEnableUnseqSpaceCompaction() && !sequence)) {
return;
return Collections.emptyList();
}
String storageGroupName = tsFileManager.getStorageGroupName();
String dataRegionId = tsFileManager.getDataRegionId();

ICompactionSelector innerSpaceCompactionSelector = null;
if (sequence) {
Expand All @@ -103,44 +87,76 @@ public static void tryToSubmitInnerSpaceCompactionTask(
.getInnerUnsequenceCompactionSelector()
.createInstance(storageGroupName, dataRegionId, timePartition, tsFileManager);
}
List<List<TsFileResource>> taskList =
innerSpaceCompactionSelector.selectInnerSpaceTask(
sequence
? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
: tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
for (List<TsFileResource> task : taskList) {
ICompactionPerformer performer =
sequence
? IoTDBDescriptor.getInstance()
.getConfig()
.getInnerSeqCompactionPerformer()
.createInstance()
: IoTDBDescriptor.getInstance()
.getConfig()
.getInnerUnseqCompactionPerformer()
.createInstance();
CompactionTaskManager.getInstance()
.addTaskToWaitingQueue(
new InnerSpaceCompactionTask(
timePartition,
tsFileManager,
task,
sequence,
performer,
CompactionTaskManager.currentTaskNum,
tsFileManager.getNextCompactionTaskId()));

return innerSpaceCompactionSelector.selectInnerSpaceTask(
sequence
? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
: tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
}

public static void tryToSubmitInnerSpaceCompactionTask(
TsFileManager tsFileManager, long timePartition) throws InterruptedException {
List<List<TsFileResource>> seqTaskList =
selectInnerSpaceCompactionTask(timePartition, tsFileManager, true);
List<List<TsFileResource>> unseqTaskList =
selectInnerSpaceCompactionTask(timePartition, tsFileManager, false);
int taskFreeSize =
config.getCandidateCompactionTaskQueueSize()
- CompactionTaskManager.getInstance().getCompactionCandidateTaskCount();
int taskSize = Math.max(seqTaskList.size(), unseqTaskList.size());
for (int i = 0; i < taskSize; i++) {
if (taskFreeSize <= 0) {
break;
}
// submit one seq inner space task
if (i < seqTaskList.size()) {
submitInnerTask(seqTaskList.get(i), tsFileManager, timePartition, true);
taskFreeSize--;
}

// submit one unseq inner space task
if (i < unseqTaskList.size()) {
submitInnerTask(unseqTaskList.get(i), tsFileManager, timePartition, false);
taskFreeSize--;
}
}
}

private static void tryToSubmitCrossSpaceCompactionTask(
String logicalStorageGroupName,
String dataRegionId,
private static void submitInnerTask(
List<TsFileResource> taskList,
TsFileManager tsFileManager,
long timePartition,
TsFileManager tsFileManager)
boolean sequence)
throws InterruptedException {
ICompactionPerformer performer =
sequence
? IoTDBDescriptor.getInstance()
.getConfig()
.getInnerSeqCompactionPerformer()
.createInstance()
: IoTDBDescriptor.getInstance()
.getConfig()
.getInnerUnseqCompactionPerformer()
.createInstance();
CompactionTaskManager.getInstance()
.addTaskToWaitingQueue(
new InnerSpaceCompactionTask(
timePartition,
tsFileManager,
taskList,
sequence,
performer,
CompactionTaskManager.currentTaskNum,
tsFileManager.getNextCompactionTaskId()));
}

private static void tryToSubmitCrossSpaceCompactionTask(
TsFileManager tsFileManager, long timePartition) throws InterruptedException {
if (!config.isEnableCrossSpaceCompaction()) {
return;
}
String logicalStorageGroupName = tsFileManager.getStorageGroupName();
String dataRegionId = tsFileManager.getDataRegionId();
ICrossSpaceSelector crossSpaceCompactionSelector =
config
.getCrossCompactionSelector()
Expand All @@ -149,7 +165,10 @@ private static void tryToSubmitCrossSpaceCompactionTask(
crossSpaceCompactionSelector.selectCrossSpaceTask(
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition),
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
List<Long> memoryCost = crossSpaceCompactionSelector.getCompactionMemoryCost();
List<Long> memoryCost =
taskList.stream()
.map(CrossCompactionTaskResource::getTotalMemoryCost)
.collect(Collectors.toList());
for (int i = 0, size = taskList.size(); i < size; ++i) {
CompactionTaskManager.getInstance()
.addTaskToWaitingQueue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class CompactionTaskManager implements IService {

private static final CompactionTaskManager INSTANCE = new CompactionTaskManager();

private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

// The thread pool that executes the compaction task. The default number of threads for this pool
// is 10.
private WrappedThreadPoolExecutor taskExecutionPool;
Expand All @@ -67,7 +69,8 @@ public class CompactionTaskManager implements IService {

public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
private final FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
new FixedPriorityBlockingQueue<>(1024, new DefaultCompactionTaskComparatorImpl());
new FixedPriorityBlockingQueue<>(
config.getCandidateCompactionTaskQueueSize(), new DefaultCompactionTaskComparatorImpl());
// <StorageGroup-DataRegionId,futureSet>, it is used to store all compaction tasks under each
// virtualStorageGroup
private final Map<String, Map<AbstractCompactionTask, Future<CompactionTaskSummary>>>
Expand All @@ -76,7 +79,6 @@ public class CompactionTaskManager implements IService {

private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);

private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private volatile boolean init = false;

public static CompactionTaskManager getInstance() {
Expand Down Expand Up @@ -219,7 +221,8 @@ public synchronized boolean addTaskToWaitingQueue(AbstractCompactionTask compact
throws InterruptedException {
if (init
&& !candidateCompactionTaskQueue.contains(compactionTask)
&& !isTaskRunning(compactionTask)) {
&& !isTaskRunning(compactionTask)
&& candidateCompactionTaskQueue.size() < config.getCandidateCompactionTaskQueueSize()) {
compactionTask.setSourceFilesToCompactionCandidate();
candidateCompactionTaskQueue.put(compactionTask);

Expand Down Expand Up @@ -324,6 +327,10 @@ public int getTotalTaskCount() {
return getExecutingTaskCount() + candidateCompactionTaskQueue.size();
}

public int getCompactionCandidateTaskCount() {
return candidateCompactionTaskQueue.size();
}

public synchronized List<AbstractCompactionTask> getRunningCompactionTaskList() {
List<AbstractCompactionTask> tasks = new ArrayList<>();
for (Map<AbstractCompactionTask, Future<CompactionTaskSummary>> runningTaskMap :
Expand Down
Loading