diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java b/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java index 157110d498c3d..3e29d3a4f266f 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/sync/utils/SyncPathUtil.java @@ -88,6 +88,10 @@ public static String getSenderRealTimePipeLogDir(String pipeName, long createTim return getSenderPipeDir(pipeName, createTime) + File.separator + SyncConstant.PIPE_LOG_DIR_NAME; } + public static String getSenderFileDataDir(String dataDir, String pipeName, long createTime) { + return dataDir + File.separator + getSenderPipeDirName(pipeName, createTime); + } + public static String getSenderFileDataDir(String pipeName, long createTime) { return getSenderPipeDir(pipeName, createTime) + File.separator diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java index f1c46fe38f592..d77a6db5ce7e1 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.sync.utils.SyncConstant; import org.apache.iotdb.commons.sync.utils.SyncPathUtil; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe; @@ -34,6 +35,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.nio.file.FileSystemException; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; @@ -44,7 +46,13 @@ public class TsFilePipeLogger { private final String pipeDir; private final String tsFileDir; + private final String pipeName; + + private final long createTime; + public TsFilePipeLogger(TsFilePipe tsFilePipe) { + this.pipeName = tsFilePipe.getName(); + this.createTime = tsFilePipe.getCreateTime(); pipeDir = SyncPathUtil.getSenderPipeDir(tsFilePipe.getName(), tsFilePipe.getCreateTime()); tsFileDir = SyncPathUtil.getSenderFileDataDir(tsFilePipe.getName(), tsFilePipe.getCreateTime()); } @@ -108,15 +116,38 @@ public void createTsFileResourceHardlink(File tsFile) throws IOException { } private File createHardLink(File file) throws IOException { - File link = new File(tsFileDir, getRelativeFilePath(file)); - if (!link.getParentFile().exists()) { - link.getParentFile().mkdirs(); - } + try { + File link = new File(tsFileDir, getRelativeFilePath(file)); + if (!link.getParentFile().exists()) { + link.getParentFile().mkdirs(); + } - Path sourcePath = FileSystems.getDefault().getPath(file.getAbsolutePath()); - Path linkPath = FileSystems.getDefault().getPath(link.getAbsolutePath()); - Files.createLink(linkPath, sourcePath); - return link; + Path sourcePath = FileSystems.getDefault().getPath(file.getAbsolutePath()); + Path linkPath = FileSystems.getDefault().getPath(link.getAbsolutePath()); + Files.createLink(linkPath, sourcePath); + return link; + } catch (FileSystemException e) { + // Invalid cross-device link + // dataDir/sequence/root.xx/x/xx/tsfile + File dataDir = + file.getParentFile().getParentFile().getParentFile().getParentFile().getParentFile(); + for (String dir : IoTDBDescriptor.getInstance().getConfig().getDataDirs()) { + if (dataDir.getCanonicalPath().equals(new File(dir).getCanonicalPath())) { + File link = + new File( + SyncPathUtil.getSenderFileDataDir(dir, pipeName, createTime), + getRelativeFilePath(file)); + if (!link.getParentFile().exists()) { + link.getParentFile().mkdirs(); + } + Path sourcePath = FileSystems.getDefault().getPath(file.getAbsolutePath()); + Path linkPath = FileSystems.getDefault().getPath(link.getAbsolutePath()); + Files.createLink(linkPath, sourcePath); + return link; + } + } + throw e; + } } private String getRelativeFilePath(File file) { @@ -151,5 +182,12 @@ public void clear() throws IOException { if (pipeDir.exists()) { FileUtils.deleteDirectory(pipeDir); } + for (String dataDir : IoTDBDescriptor.getInstance().getConfig().getDataDirs()) { + File crossDiskSyncDir = + new File(SyncPathUtil.getSenderFileDataDir(dataDir, pipeName, createTime)); + if (crossDiskSyncDir.exists()) { + FileUtils.deleteDirectory(crossDiskSyncDir); + } + } } }