Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;

import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
Expand All @@ -39,14 +36,11 @@
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.concurrent.atomic.AtomicReference;

public class PipeDataRegionAssigner implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataRegionAssigner.class);

private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();

/**
* The {@link PipeDataRegionMatcher} is used to match the event with the extractor based on the
* pattern.
Expand All @@ -58,9 +52,6 @@ public class PipeDataRegionAssigner implements Closeable {

private final String dataRegionId;

private final AtomicReference<ProgressIndex> maxProgressIndexForRealtimeEvent =
new AtomicReference<>(MinimumProgressIndex.INSTANCE);

private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter();

public String getDataRegionId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public class PipeTsFileResourceManager {
hardlinkOrCopiedFileToPipeTsFileResourceMap = new ConcurrentHashMap<>();
private final PipeTsFileResourceSegmentLock segmentLock = new PipeTsFileResourceSegmentLock();

public File increaseFileReference(
final File file, final boolean isTsFile, final @Nullable String pipeName) throws IOException {
return increaseFileReference(file, isTsFile, pipeName, null);
}

/**
* Given a file, create a hardlink or copy it to pipe dir, maintain a reference count for the
* hardlink or copied file, and return the hardlink or copied file.
Expand All @@ -72,16 +77,24 @@ public class PipeTsFileResourceManager {
* @param file tsfile, resource file or mod file. can be original file or hardlink/copy of
* original file
* @param isTsFile {@code true} to create hardlink, {@code false} to copy file
* @param pipeName Nonnull if the pipe is from historical or assigner -> extractors, null if is
* dataRegion -> assigner
* @param sourceFile for inner use, historical extractor will use this to create hardlink from
* pipe tsFile -> common tsFile
* @return the hardlink or copied file
* @throws IOException when create hardlink or copy file failed
*/
public File increaseFileReference(
final File file, final boolean isTsFile, final @Nullable String pipeName) throws IOException {
private File increaseFileReference(
final File file,
final boolean isTsFile,
final @Nullable String pipeName,
final @Nullable File sourceFile)
throws IOException {
// If the file is already a hardlink or copied file,
// just increase reference count and return it
segmentLock.lock(file);
try {
if (increaseReferenceIfExists(file, pipeName)) {
if (increaseReferenceIfExists(file, pipeName, isTsFile)) {
return file;
}
} finally {
Expand All @@ -90,19 +103,22 @@ public File increaseFileReference(

// If the file is not a hardlink or copied file, check if there is a related hardlink or
// copied file in pipe dir. if so, increase reference count and return it
final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file, pipeName);
final File hardlinkOrCopiedFile =
Objects.isNull(sourceFile) ? getHardlinkOrCopiedFileInPipeDir(file, pipeName) : file;
segmentLock.lock(hardlinkOrCopiedFile);
try {
if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName)) {
if (increaseReferenceIfExists(hardlinkOrCopiedFile, pipeName, isTsFile)) {
return getResourceMap(pipeName).get(hardlinkOrCopiedFile.getPath()).getFile();
}

// If the file is a tsfile, create a hardlink in pipe dir and will return it.
// otherwise, copy the file (.mod or .resource) to pipe dir and will return it.
final File source = Objects.isNull(sourceFile) ? file : sourceFile;

final File resultFile =
isTsFile
? FileUtils.createHardLink(file, hardlinkOrCopiedFile)
: FileUtils.copyFile(file, hardlinkOrCopiedFile);
? FileUtils.createHardLink(source, hardlinkOrCopiedFile)
: FileUtils.copyFile(source, hardlinkOrCopiedFile);

// If the file is not a hardlink or copied file, and there is no related hardlink or copied
// file in pipe dir, create a hardlink or copy it to pipe dir, maintain a reference count for
Expand All @@ -116,42 +132,34 @@ public File increaseFileReference(
resultFile.getPath(), new PipeTsFilePublicResource(resultFile));
}

increasePublicReference(resultFile, pipeName);
increasePublicReference(resultFile, pipeName, isTsFile);

return resultFile;
} finally {
segmentLock.unlock(hardlinkOrCopiedFile);
}
}

private boolean increaseReferenceIfExists(final File file, final @Nullable String pipeName) {
private boolean increaseReferenceIfExists(
final File file, final @Nullable String pipeName, final boolean isTsFile) throws IOException {
final String path = file.getPath();
final PipeTsFileResource resource = getResourceMap(pipeName).get(path);
if (resource != null) {
resource.increaseReferenceCount();
increasePublicReference(file, pipeName);
increasePublicReference(file, pipeName, isTsFile);
return true;
}
return false;
}

private void increasePublicReference(final File file, final String pipeName) {
private void increasePublicReference(
final File file, final String pipeName, final boolean isTsFile) throws IOException {
if (Objects.isNull(pipeName)) {
return;
}
// Increase the assigner's file to avoid hard-link or memory cache cleaning
// Note that it does not exist for historical files
final String path = getCommonFilePath(file);
hardlinkOrCopiedFileToTsFilePublicResourceMap.compute(
path,
(k, v) -> {
if (Objects.isNull(v)) {
return new PipeTsFilePublicResource(new File(path));
} else {
v.increaseReferenceCount();
return v;
}
});
increaseFileReference(new File(getCommonFilePath(file)), isTsFile, null, file);
}

public static File getHardlinkOrCopiedFileInPipeDir(
Expand Down Expand Up @@ -228,13 +236,7 @@ private void decreasePublicReferenceIfExists(final File file, final @Nullable St
}
// Increase the assigner's file to avoid hard-link or memory cache cleaning
// Note that it does not exist for historical files
final String commonFilePath = getCommonFilePath(file);
if (hardlinkOrCopiedFileToTsFilePublicResourceMap.containsKey(commonFilePath)
&& hardlinkOrCopiedFileToTsFilePublicResourceMap
.get(commonFilePath)
.decreaseReferenceCount()) {
hardlinkOrCopiedFileToPipeTsFileResourceMap.remove(commonFilePath);
}
decreaseFileReference(new File(getCommonFilePath(file)), null);
}

// Warning: Shall not be called by the assigner
Expand Down
Loading