Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public enum ThreadName {
FLUSH_SERVICE("Flush"),
FLUSH_SUB_TASK_SERVICE("Flush-SubTask"),
COMPACTION_SERVICE("Compaction"),
COMPACTION_SUB_SERVICE("Sub-Compaction"),
COMPACTION_SCHEDULE("Compaction_Schedule"),
WAL_DAEMON("WAL-Sync"),
WAL_FORCE_DAEMON("WAL-Force"),
Expand Down
6 changes: 6 additions & 0 deletions server/src/assembly/resources/conf/iotdb-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,12 @@ timestamp_precision=ms
# Datatype: int
# query_timeout_threshold=60000

# The number of sub compaction threads to be set up to perform compaction.
# Currently only works for nonAligned data in cross space compaction and unseq inner space compaction.
# Set to 1 when less than or equal to 0.
# Datatype: int
# sub_compaction_thread_num=4

####################
### Metadata Cache Configuration
####################
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ public class IoTDBConfig {
/** The interval of compaction task submission from queue in CompactionTaskMananger */
private long compactionSubmissionIntervalInMs = 60_000L;

/**
* The number of sub compaction threads to be set up to perform compaction. Currently only works
* for nonAligned data in cross space compaction and unseq inner space compaction.
*/
private int subCompactionTaskNum = 4;

/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
private boolean metaDataCacheEnable = true;

Expand Down Expand Up @@ -2515,6 +2521,14 @@ public void setCompactionSubmissionIntervalInMs(long interval) {
compactionSubmissionIntervalInMs = interval;
}

public int getSubCompactionTaskNum() {
return subCompactionTaskNum;
}

public void setSubCompactionTaskNum(int subCompactionTaskNum) {
this.subCompactionTaskNum = subCompactionTaskNum;
}

public String getDeviceIDTransformationMethod() {
return deviceIDTransformationMethod;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,13 @@ private void loadProps() {
properties.getProperty(
"compaction_priority", conf.getCompactionPriority().toString())));

int subtaskNum =
Integer.parseInt(
properties.getProperty(
"sub_compaction_thread_num", Integer.toString(conf.getSubCompactionTaskNum())));
subtaskNum = subtaskNum <= 0 ? 1 : subtaskNum;
conf.setSubCompactionTaskNum(subtaskNum);

conf.setQueryTimeoutThreshold(
Integer.parseInt(
properties.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -55,6 +56,10 @@ public class CompactionTaskManager implements IService {
// The thread pool that executes the compaction task. The default number of threads for this pool
// is 10.
private WrappedScheduledExecutorService taskExecutionPool;

// The thread pool that executes the sub compaction task.
private ScheduledExecutorService subCompactionTaskExecutionPool;

public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
private FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
new FixedPriorityBlockingQueue<>(1024, new CompactionTaskComparator());
Expand Down Expand Up @@ -86,6 +91,11 @@ public synchronized void start() {
IoTDBThreadPoolFactory.newScheduledThreadPool(
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread(),
ThreadName.COMPACTION_SERVICE.getName());
this.subCompactionTaskExecutionPool =
IoTDBThreadPoolFactory.newScheduledThreadPool(
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
* IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(),
ThreadName.COMPACTION_SUB_SERVICE.getName());
currentTaskNum = new AtomicInteger(0);
compactionTaskSubmissionThreadPool =
IoTDBThreadPoolFactory.newScheduledThreadPool(1, ThreadName.COMPACTION_SERVICE.getName());
Expand Down Expand Up @@ -310,6 +320,14 @@ public synchronized void submitTask(AbstractCompactionTask compactionTask)
: "taskExecutionPool is terminated");
}

public synchronized Future<Void> submitSubTask(Callable<Void> subCompactionTask) {
if (subCompactionTaskExecutionPool != null && !subCompactionTaskExecutionPool.isTerminated()) {
Future<Void> future = subCompactionTaskExecutionPool.submit(subCompactionTask);
return future;
}
return null;
}

/**
* Abort all compactions of a storage group. The running compaction tasks will be returned as a
* list, the compaction threads for the storage group are not terminated util all the tasks in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.cross.rewrite.task.SubCompactionTask;
import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter;
Expand All @@ -44,35 +45,41 @@
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

/**
* This tool can be used to perform inner space or cross space compaction of aligned and non aligned
* timeseries . Currently, we use {@link
* org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils} to speed up if it is
* an inner space compaction.
* an seq inner space compaction.
*/
public class CompactionUtils {
private static final Logger logger =
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private static final int subTaskNum =
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();

public static void compact(
List<TsFileResource> seqFileResources,
Expand Down Expand Up @@ -108,6 +115,7 @@ public static void compact(
}

compactionWriter.endFile();
updateDeviceStartTimeAndEndTime(targetFileResources, compactionWriter);
updatePlanIndexes(targetFileResources, seqFileResources, unseqFileResources);
} finally {
QueryResourceManager.getInstance().endQuery(queryId);
Expand Down Expand Up @@ -157,9 +165,9 @@ private static void compactAlignedSeries(
if (dataBatchReader.hasNextBatch()) {
// chunkgroup is serialized only when at least one timeseries under this device has data
compactionWriter.startChunkGroup(device, true);
compactionWriter.startMeasurement(measurementSchemas);
writeWithReader(compactionWriter, dataBatchReader);
compactionWriter.endMeasurement();
compactionWriter.startMeasurement(measurementSchemas, 0);
writeWithReader(compactionWriter, dataBatchReader, 0);
compactionWriter.endMeasurement(0);
compactionWriter.endChunkGroup();
}
}
Expand All @@ -170,59 +178,58 @@ private static void compactNonAlignedSeries(
AbstractCompactionWriter compactionWriter,
QueryContext queryContext,
QueryDataSource queryDataSource)
throws MetadataException, IOException {
boolean hasStartChunkGroup = false;
throws IOException, InterruptedException {
MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
deviceIterator.iterateNotAlignedSeries(device, false);
Set<String> allMeasurements = measurementIterator.getAllMeasurements();
int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);

// assign all measurements to different sub tasks
Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
int idx = 0;
for (String measurement : allMeasurements) {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
try {
if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement));
} else {
measurementSchemas.add(
IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device, measurement)));
}
} catch (PathNotExistException e) {
logger.info("A deleted path is skipped: {}", e.getMessage());
continue;
if (measurementsForEachSubTask[idx % subTaskNums] == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to hash here? If the hash is not uniform, then the effect of multithreading could not be worse. Why not just use a concurrency stack or queue, each sub-thread get a measurement to compact when it is available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each sub task has its own hashset, so this won't happen.

measurementsForEachSubTask[idx % subTaskNums] = new HashSet<String>();
}
measurementsForEachSubTask[idx++ % subTaskNums].add(measurement);
}

IBatchReader dataBatchReader =
constructReader(
device,
Collections.singletonList(measurement),
measurementSchemas,
allMeasurements,
queryContext,
queryDataSource,
false);
// construct sub tasks and start compacting measurements in parallel
List<Future<Void>> futures = new ArrayList<>();
compactionWriter.startChunkGroup(device, false);
for (int i = 0; i < subTaskNums; i++) {
futures.add(
CompactionTaskManager.getInstance()
.submitSubTask(
new SubCompactionTask(
device,
measurementsForEachSubTask[i],
queryContext,
queryDataSource,
compactionWriter,
i)));
}

if (dataBatchReader.hasNextBatch()) {
if (!hasStartChunkGroup) {
// chunkgroup is serialized only when at least one timeseries under this device has
// data
compactionWriter.startChunkGroup(device, false);
hasStartChunkGroup = true;
}
compactionWriter.startMeasurement(measurementSchemas);
writeWithReader(compactionWriter, dataBatchReader);
compactionWriter.endMeasurement();
// wait for all sub tasks finish
for (int i = 0; i < subTaskNums; i++) {
try {
futures.get(i).get();
} catch (InterruptedException | ExecutionException e) {
logger.error("SubCompactionTask meet errors ", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.interrepted().

Thread.interrupted();
throw new InterruptedException();
}
}

if (hasStartChunkGroup) {
compactionWriter.endChunkGroup();
}
compactionWriter.endChunkGroup();
}

private static void writeWithReader(AbstractCompactionWriter writer, IBatchReader reader)
throws IOException {
public static void writeWithReader(
AbstractCompactionWriter writer, IBatchReader reader, int subTaskId) throws IOException {
while (reader.hasNextBatch()) {
BatchData batchData = reader.nextBatch();
while (batchData.hasCurrent()) {
writer.write(batchData.currentTime(), batchData.currentValue());
writer.write(batchData.currentTime(), batchData.currentValue(), subTaskId);
batchData.next();
}
}
Expand All @@ -232,7 +239,7 @@ private static void writeWithReader(AbstractCompactionWriter writer, IBatchReade
* @param measurementIds if device is aligned, then measurementIds contain all measurements. If
* device is not aligned, then measurementIds only contain one measurement.
*/
private static IBatchReader constructReader(
public static IBatchReader constructReader(
String deviceId,
List<String> measurementIds,
List<IMeasurementSchema> measurementSchemas,
Expand Down Expand Up @@ -268,6 +275,29 @@ private static AbstractCompactionWriter getCompactionWriter(
}
}

private static void updateDeviceStartTimeAndEndTime(
List<TsFileResource> targetResources, AbstractCompactionWriter compactionWriter) {
List<TsFileIOWriter> targetFileWriters = compactionWriter.getFileIOWriter();
for (int i = 0; i < targetFileWriters.size(); i++) {
TsFileIOWriter fileIOWriter = targetFileWriters.get(i);
TsFileResource fileResource = targetResources.get(i);
// The tmp target file may does not have any data points written due to the existence of the
// mods file, and it will be deleted after compaction. So skip the target file that has been
// deleted.
if (!fileResource.getTsFile().exists()) {
continue;
}
for (Map.Entry<String, List<TimeseriesMetadata>> entry :
fileIOWriter.getDeviceTimeseriesMetadataMap().entrySet()) {
String device = entry.getKey();
for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
fileResource.updateStartTime(device, timeseriesMetadata.getStatistics().getStartTime());
fileResource.updateEndTime(device, timeseriesMetadata.getStatistics().getEndTime());
}
}
}
}

private static void updatePlanIndexes(
List<TsFileResource> targetResources,
List<TsFileResource> seqResources,
Expand All @@ -280,7 +310,7 @@ private static void updatePlanIndexes(
// in the new file
for (int i = 0; i < targetResources.size(); i++) {
TsFileResource targetResource = targetResources.get(i);
// remove the target file been deleted from list
// remove the target file that has been deleted from list
if (!targetResource.getTsFile().exists()) {
targetResources.remove(i--);
continue;
Expand Down
Loading