Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public abstract class AbstractCompactionTask {
protected boolean crossTask;
protected boolean innerSeqTask;

protected long memoryCost = 0L;

public AbstractCompactionTask(
String storageGroupName,
String dataRegionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
protected List<TsFileResource> holdWriteLockList = new ArrayList<>();
protected double selectedSeqFileSize = 0;
protected double selectedUnseqFileSize = 0;
protected long memoryCost = 0L;

public CrossSpaceCompactionTask(
long timePartition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.TsFileMetricManager;
import org.apache.iotdb.db.engine.compaction.execute.exception.CompactionExceptionHandler;
import org.apache.iotdb.db.engine.compaction.execute.exception.CompactionMemoryNotEnoughException;
import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer;
import org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.selector.estimator.AbstractInnerSpaceEstimator;
import org.apache.iotdb.db.engine.compaction.selector.estimator.FastCompactionInnerCompactionEstimator;
import org.apache.iotdb.db.engine.compaction.selector.estimator.ReadChunkInnerCompactionEstimator;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
Expand Down Expand Up @@ -69,6 +74,8 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {

protected long maxModsFileSize;

protected AbstractInnerSpaceEstimator innerSpaceEstimator;

public InnerSpaceCompactionTask(
long timePartition,
TsFileManager tsFileManager,
Expand All @@ -87,6 +94,13 @@ public InnerSpaceCompactionTask(
this.selectedTsFileResourceList = selectedTsFileResourceList;
this.sequence = sequence;
this.performer = performer;
if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) {
if (this.performer instanceof ReadChunkInnerCompactionEstimator) {
innerSpaceEstimator = new ReadChunkInnerCompactionEstimator();
} else if (!sequence && this.performer instanceof FastCompactionInnerCompactionEstimator) {
innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
}
}
isHoldingReadLock = new boolean[selectedTsFileResourceList.size()];
isHoldingWriteLock = new boolean[selectedTsFileResourceList.size()];
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
Expand Down Expand Up @@ -317,6 +331,7 @@ protected boolean doCompaction() {
isSequence());
}
} finally {
SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
releaseAllLocksAndResetStatus();
return isSuccess;
}
Expand Down Expand Up @@ -458,9 +473,27 @@ public boolean checkValidAndSetMerging() {
return false;
}
}
if (innerSpaceEstimator != null) {
memoryCost = innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList);
}
SystemInfo.getInstance().addCompactionMemoryCost(memoryCost, 60);
} catch (Throwable e) {
if (e instanceof InterruptedException) {
LOGGER.warn("Interrupted when allocating memory for compaction", e);
Thread.currentThread().interrupt();
} else if (e instanceof CompactionMemoryNotEnoughException) {
LOGGER.warn("No enough memory for current compaction task {}", this, e);
}
releaseAllLocksAndResetStatus();
throw e;
return false;
} finally {
try {
if (innerSpaceEstimator != null) {
innerSpaceEstimator.close();
}
} catch (IOException e) {
LOGGER.warn("Failed to close InnerSpaceCompactionMemoryEstimator");
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.iotdb.db.engine.compaction.execute.performer.constant.CrossCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.selector.estimator.AbstractCompactionEstimator;
import org.apache.iotdb.db.engine.compaction.selector.estimator.ReadPointCrossCompactionEstimator;
import org.apache.iotdb.db.engine.compaction.selector.estimator.FastCrossSpaceCompactionEstimator;
import org.apache.iotdb.db.engine.compaction.selector.utils.CrossCompactionTaskResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;

Expand Down Expand Up @@ -66,7 +66,7 @@ static AbstractCompactionEstimator getCompactionEstimator(
case READ_POINT:
case FAST:
if (!isInnerSpace) {
return new ReadPointCrossCompactionEstimator();
return new FastCrossSpaceCompactionEstimator();
}
default:
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,128 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.engine.compaction.selector.estimator;

import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
import org.apache.iotdb.db.engine.storagegroup.timeindex.FileTimeIndex;
import org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Estimate the memory cost of one compaction task with specific source files based on its
* corresponding implementation.
*/
public abstract class AbstractCompactionEstimator {
public abstract class AbstractCompactionEstimator implements Closeable {

protected Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
protected Map<TsFileResource, FileInfo> fileInfoCache = new HashMap<>();
protected Map<TsFileResource, DeviceTimeIndex> deviceTimeIndexCache = new HashMap<>();

protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

protected long compressionRatio = (long) CompressionRatio.getInstance().getRatio() + 1;

/**
* Estimate the memory cost of compacting the unseq file and its corresponding overlapped seq
* files in cross space compaction task.
*/
public abstract long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException;
protected abstract long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo);

/** Estimate the memory cost of compacting the source files in inner space compaction task. */
public abstract long estimateInnerCompactionMemory(List<TsFileResource> resources);
protected abstract long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) throws IOException;

/** Construct a new or get an existing TsFileSequenceReader of a TsFile. */
protected TsFileSequenceReader getFileReader(TsFileResource tsFileResource) throws IOException {
TsFileSequenceReader reader = fileReaderCache.get(tsFileResource);
if (reader == null) {
reader = new TsFileSequenceReader(tsFileResource.getTsFilePath(), true, false);
fileReaderCache.put(tsFileResource, reader);
protected CompactionTaskInfo calculatingCompactionTaskInfo(List<TsFileResource> resources)
throws IOException {
List<FileInfo> fileInfoList = new ArrayList<>();
for (TsFileResource resource : resources) {
FileInfo fileInfo = getFileInfoFromCache(resource);
fileInfoList.add(fileInfo);
}
return reader;
return new CompactionTaskInfo(resources, fileInfoList);
}

public void close() throws IOException {
for (TsFileSequenceReader reader : fileReaderCache.values()) {
reader.close();
private FileInfo getFileInfoFromCache(TsFileResource resource) throws IOException {
if (fileInfoCache.containsKey(resource)) {
return fileInfoCache.get(resource);
}
try (TsFileSequenceReader reader =
new TsFileSequenceReader(resource.getTsFilePath(), true, false)) {
FileInfo fileInfo = CompactionEstimateUtils.calculateFileInfo(reader);
fileInfoCache.put(resource, fileInfo);
return fileInfo;
}
}

protected int calculatingMaxOverlapFileNumInSubCompactionTask(List<TsFileResource> resources)
throws IOException {
Set<String> devices = new HashSet<>();
List<DeviceTimeIndex> resourceDevices = new ArrayList<>(resources.size());
for (TsFileResource resource : resources) {
DeviceTimeIndex deviceTimeIndex = getDeviceTimeIndexFromCache(resource);
devices.addAll(deviceTimeIndex.getDevices());
resourceDevices.add(deviceTimeIndex);
}
int maxOverlapFileNumInSubCompactionTask = 1;
for (String device : devices) {
List<DeviceTimeIndex> resourcesContainsCurrentDevice =
resourceDevices.stream()
.filter(resource -> !resource.definitelyNotContains(device))
.sorted(Comparator.comparingLong(resource -> resource.getStartTime(device)))
.collect(Collectors.toList());
if (resourcesContainsCurrentDevice.size() < maxOverlapFileNumInSubCompactionTask) {
continue;
}

long maxEndTimeOfCurrentDevice = Long.MIN_VALUE;
int overlapFileNumOfCurrentDevice = 0;
for (DeviceTimeIndex resource : resourcesContainsCurrentDevice) {
long deviceStartTimeInCurrentFile = resource.getStartTime(device);
long deviceEndTimeInCurrentFile = resource.getEndTime(device);
if (deviceStartTimeInCurrentFile <= maxEndTimeOfCurrentDevice) {
// has overlap, update max end time
maxEndTimeOfCurrentDevice =
Math.max(maxEndTimeOfCurrentDevice, deviceEndTimeInCurrentFile);
overlapFileNumOfCurrentDevice++;
maxOverlapFileNumInSubCompactionTask =
Math.max(maxOverlapFileNumInSubCompactionTask, overlapFileNumOfCurrentDevice);
} else {
// reset max end time and overlap file num of current device
maxEndTimeOfCurrentDevice = deviceEndTimeInCurrentFile;
overlapFileNumOfCurrentDevice = 1;
}
}
// already reach the max value
if (maxOverlapFileNumInSubCompactionTask == resources.size()) {
return maxOverlapFileNumInSubCompactionTask;
}
}
fileReaderCache.clear();
return maxOverlapFileNumInSubCompactionTask;
}

private DeviceTimeIndex getDeviceTimeIndexFromCache(TsFileResource resource) throws IOException {
if (deviceTimeIndexCache.containsKey(resource)) {
return deviceTimeIndexCache.get(resource);
}
ITimeIndex timeIndex = resource.getTimeIndex();
if (timeIndex instanceof FileTimeIndex) {
timeIndex = resource.buildDeviceTimeIndex();
}
deviceTimeIndexCache.put(resource, (DeviceTimeIndex) timeIndex);
return (DeviceTimeIndex) timeIndex;
}

public void close() throws IOException {
deviceTimeIndexCache.clear();
fileInfoCache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,41 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.engine.compaction.selector.estimator;

import org.apache.iotdb.db.engine.storagegroup.TsFileResource;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Estimate the memory cost of one cross space compaction task with specific source files based on
* its corresponding implementation.
*/
public abstract class AbstractCrossSpaceEstimator extends AbstractCompactionEstimator {
public abstract long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException;

public long estimateInnerCompactionMemory(List<TsFileResource> resources) {
throw new RuntimeException(
"This kind of estimator cannot be used to estimate inner space compaction task");
public long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
if (!config.isEnableCompactionMemControl()) {
return 0;
}
List<TsFileResource> resources = new ArrayList<>(seqResources.size() + unseqResources.size());
resources.addAll(seqResources);
resources.addAll(unseqResources);
if (!CompactionEstimateUtils.addReadLock(resources)) {
return -1L;
}

long cost = 0;
try {
CompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
cost += calculatingMetadataMemoryCost(taskInfo);
cost += calculatingDataMemoryCost(taskInfo);
} finally {
CompactionEstimateUtils.releaseReadLock(resources);
}
return cost;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.engine.compaction.selector.estimator;

import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
Expand All @@ -28,11 +29,14 @@
* its corresponding implementation.
*/
public abstract class AbstractInnerSpaceEstimator extends AbstractCompactionEstimator {
public abstract long estimateInnerCompactionMemory(List<TsFileResource> resources);

public long estimateCrossCompactionMemory(
List<TsFileResource> seqResources, TsFileResource unseqResource) throws IOException {
throw new RuntimeException(
"This kind of estimator cannot be used to estimate cross space compaction task");
public long estimateInnerCompactionMemory(List<TsFileResource> resources) throws IOException {
if (!config.isEnableCompactionMemControl()) {
return 0;
}
CompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
long cost = calculatingMetadataMemoryCost(taskInfo);
cost += calculatingDataMemoryCost(taskInfo);
return cost;
}
}
Loading