From e2956df0ceeced45ac36cf61ba4ce40498be54f4 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 23 Nov 2023 21:46:43 +0800 Subject: [PATCH 01/10] support copy for ArcticFileIO --- .../com/netease/arctic/io/ArcticFileIO.java | 7 +++++++ .../netease/arctic/io/ArcticHadoopFileIO.java | 19 +++++++++++++++++++ .../arctic/io/RecoverableArcticFileIO.java | 5 +++++ 3 files changed, 31 insertions(+) diff --git a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java index fa9ccbb7d7..ddafc895aa 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java @@ -62,6 +62,13 @@ public interface ArcticFileIO extends FileIO, Configurable { */ void rename(String oldPath, String newPath); + /** + * Copy file from old path to new path + * @param oldPath - source path + * @param newPath - target path + */ + void copy(String oldPath, String newPath); + /** Delete a directory recursively * * @param path the path to delete. diff --git a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java index 8e7ca1b879..7b3dfef68b 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java @@ -21,6 +21,7 @@ import com.netease.arctic.table.TableMetaStore; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -179,6 +180,24 @@ public void rename(String src, String dts) { }); } + @Override + public void copy(String src, String dts) { + tableMetaStore.doAs(() -> { + Path srcPath = new Path(src); + Path dtsPath = new Path(dts); + FileSystem fs = getFs(srcPath); + if (fs.isDirectory(srcPath)) { + throw new IllegalArgumentException("can't copy directory"); + } + try { + FileUtil.copy(fs, srcPath, fs, dtsPath, false, true, conf()); + } catch (IOException e) { + throw new UncheckedIOException("Fail to rename: from " + src + " to " + dts, e); + } + return null; + }); + } + @Override public T doAs(Callable callable) { return tableMetaStore.doAs(callable); diff --git a/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java b/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java index 52e6f7db6c..82befa11b6 100644 --- a/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java @@ -55,6 +55,11 @@ public void rename(String oldPath, String newPath) { fileIO.rename(oldPath, newPath); } + @Override + public void copy(String oldPath, String newPath) { + fileIO.copy(oldPath, newPath); + } + @Override public void deleteDirectoryRecursively(String path) { //Do not move trash when deleting directory as it is used for dropping table only From f391dae062798e0f512b2c12c0acc4e9eaf16067 Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 24 Nov 2023 17:54:04 +0800 Subject: [PATCH 02/10] support copying files for optimizer executor --- .../properties/OptimizeTaskProperties.java | 1 + .../optimizer/operator/BaseTaskExecutor.java | 2 + .../operator/executor/MajorExecutor.java | 37 +++++++++++++++++++ .../optimizer/operator/executor/NodeTask.java | 10 +++++ .../TestSupportHiveMajorExecutor.java | 22 +++++++++++ 5 files changed, 72 insertions(+) diff --git a/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java b/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java index a787fa36df..f0b2984ddc 100644 --- a/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java +++ b/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java @@ -4,6 +4,7 @@ public class OptimizeTaskProperties { // optimize task properties public static final String ALL_FILE_COUNT = "all-file-cnt"; public static final String CUSTOM_HIVE_SUB_DIRECTORY = "custom-hive-sub-directory"; + public static final String ENABLE_COPY_FILES= "copy-files-enabled"; public static final String MAX_EXECUTE_TIME = "max-execute-time"; public static final String MOVE_FILES_TO_HIVE_LOCATION = "move-files-to-hive-location"; } diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java index 494fed195f..79a076d1ca 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java @@ -233,6 +233,8 @@ private NodeTask constructTask(ArcticTable table, OptimizeTask task, int attempt String customHiveSubdirectory = properties.get(OptimizeTaskProperties.CUSTOM_HIVE_SUB_DIRECTORY); nodeTask.setCustomHiveSubdirectory(customHiveSubdirectory); + nodeTask.setCopyFiles( + PropertyUtil.propertyAsBoolean(properties, OptimizeTaskProperties.ENABLE_COPY_FILES, false)); Long maxExecuteTime = PropertyUtil.propertyAsLong(properties, OptimizeTaskProperties.MAX_EXECUTE_TIME, TableProperties.SELF_OPTIMIZING_EXECUTE_TIMEOUT_DEFAULT); diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java index b7b73d47fe..17f5e275eb 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java @@ -24,6 +24,9 @@ import com.netease.arctic.data.PrimaryKeyedFile; import com.netease.arctic.hive.io.reader.AdaptHiveGenericArcticDataReader; import com.netease.arctic.hive.io.writer.AdaptHiveGenericTaskWriterBuilder; +import com.netease.arctic.hive.table.SupportHive; +import com.netease.arctic.hive.utils.HiveTableUtil; +import com.netease.arctic.hive.utils.TableTypeUtil; import com.netease.arctic.optimizer.OptimizerConfig; import com.netease.arctic.scan.ArcticFileScanTask; import com.netease.arctic.scan.BasicArcticFileScanTask; @@ -33,8 +36,10 @@ import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.PrimaryKeySpec; import com.netease.arctic.table.WriteOperationKind; +import com.netease.arctic.utils.TableFileUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.Schema; import org.apache.iceberg.TableProperties; @@ -42,6 +47,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +70,21 @@ public OptimizeTaskResult execute() throws Exception { Iterable targetFiles; LOG.info("Start processing arctic table major optimize task {} of {}: {}", task.getTaskId(), task.getTableIdentifier(), task); + if (TableTypeUtil.isHive(table) && task.isCopyFiles()) { + if (!task.posDeleteFiles().isEmpty() || !task.deleteFiles().isEmpty()) { + LOG.info("Task {} of {} enables copy files, but has delete files {} {}, should not copy", task.getTaskId(), + task.getTableIdentifier(), task.posDeleteFiles().size(), task.deleteFiles().size()); + } else if (task.getCustomHiveSubdirectory() == null) { + LOG.info("Task {} of {} enables copy files, but custom hive directory is null, should not copy", + task.getTaskId(), + task.getTableIdentifier()); + } else { + LOG.info("Task {} of {} enables copy files, copy {} data files", task.getTaskId(), task.getTableIdentifier(), + task.dataFiles().size()); + targetFiles = table.io().doAs(() -> copyFiles(task.dataFiles(), task.getCustomHiveSubdirectory())); + return buildOptimizeResult(targetFiles); + } + } Map> deleteFileMap = groupDeleteFilesByNode(task.posDeleteFiles()); List dataFiles = task.dataFiles(); @@ -77,6 +98,22 @@ public OptimizeTaskResult execute() throws Exception { return buildOptimizeResult(targetFiles); } + private Iterable copyFiles(List dataFiles, String customHiveSubdirectory) { + return Iterables.transform(dataFiles, dataFile -> { + String hiveLocation = HiveTableUtil.newHiveDataLocation(((SupportHive) table).hiveLocation(), + table.spec(), dataFile.partition(), customHiveSubdirectory); + + String sourcePath = dataFile.path().toString(); + String targetPath = TableFileUtils.getNewFilePath(hiveLocation, sourcePath); + LOG.info("Start copy file from {} to {}", sourcePath, targetPath); + long startTime = System.currentTimeMillis(); + table.io().copy(sourcePath, targetPath); + LOG.info("Successfully copy file to {}, cost {} ms", targetPath, + System.currentTimeMillis() - startTime); + return DataFiles.builder(table.spec()).copy(dataFile).withPath(targetPath).build(); + }); + } + @Override public void close() { } diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/NodeTask.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/NodeTask.java index ef08172a43..227dd8a294 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/NodeTask.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/NodeTask.java @@ -58,6 +58,7 @@ public class NodeTask { private TableIdentifier tableIdentifier; private int attemptId; private String customHiveSubdirectory; + private boolean copyFiles; private Long maxExecuteTime; public NodeTask(List> baseFiles, @@ -258,6 +259,14 @@ public void setCustomHiveSubdirectory(String customHiveSubdirectory) { this.customHiveSubdirectory = customHiveSubdirectory; } + public boolean isCopyFiles() { + return copyFiles; + } + + public void setCopyFiles(boolean copyFiles) { + this.copyFiles = copyFiles; + } + public Long getMaxExecuteTime() { return maxExecuteTime; } @@ -277,6 +286,7 @@ public String toString() { .add("taskId", taskId) .add("attemptId", attemptId) .add("tableIdentifier", tableIdentifier) + .add("copyFiles", copyFiles) .add("baseFiles", baseFiles.size()) .add("insertFiles", insertFiles.size()) .add("deleteFiles", deleteFiles.size()) diff --git a/optimizer/src/test/java/com/netease/arctic/optimizer/operator/executor/TestSupportHiveMajorExecutor.java b/optimizer/src/test/java/com/netease/arctic/optimizer/operator/executor/TestSupportHiveMajorExecutor.java index 757fe0bf72..09adbd4a3c 100644 --- a/optimizer/src/test/java/com/netease/arctic/optimizer/operator/executor/TestSupportHiveMajorExecutor.java +++ b/optimizer/src/test/java/com/netease/arctic/optimizer/operator/executor/TestSupportHiveMajorExecutor.java @@ -23,11 +23,13 @@ import com.netease.arctic.ams.api.OptimizeType; import com.netease.arctic.data.DataTreeNode; import com.netease.arctic.data.file.ContentFileWithSequence; +import com.netease.arctic.hive.utils.HiveTableUtil; import com.netease.arctic.optimizer.OptimizerConfig; import com.netease.arctic.optimizer.util.ContentFileUtil; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; +import com.netease.arctic.utils.TableFileUtils; import org.junit.Assert; import org.junit.Test; @@ -117,6 +119,26 @@ public void testUnKeyedTableFullMajorExecutor() throws Exception { }); } + @Test + public void testUnKeyedTableFullMajorExecutorCopyFiles() throws Exception { + insertTableBaseDataFiles(testHiveTable, null, baseDataFilesInfo); + NodeTask nodeTask = constructNodeTask(testHiveTable, OptimizeType.FullMajor); + nodeTask.setCopyFiles(true); + String hiveSubdirectory = HiveTableUtil.newHiveSubdirectory(); + nodeTask.setCustomHiveSubdirectory(hiveSubdirectory); + String[] arg = new String[0]; + OptimizerConfig optimizerConfig = new OptimizerConfig(arg); + optimizerConfig.setOptimizerId("UnitTest"); + MajorExecutor majorExecutor = new MajorExecutor(nodeTask, testHiveTable, System.currentTimeMillis(), optimizerConfig); + OptimizeTaskResult result = majorExecutor.execute(); + Assert.assertEquals(Iterables.size(result.getTargetFiles()), 10); + result.getTargetFiles().forEach(dataFile -> { + Assert.assertEquals(100, dataFile.recordCount()); + Assert.assertEquals(testHiveTable.hiveLocation() + "/name=name/" + hiveSubdirectory, + TableFileUtils.getFileDir(dataFile.path().toString())); + }); + } + @Test public void testNoPartitionTableMajorExecutor() throws Exception { insertBasePosDeleteFiles(testUnPartitionKeyedHiveTable, 2L, baseDataFilesInfo, posDeleteFilesInfo); From 51b1108d5a520f2eb3a769b769374b9c3559355e Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 24 Nov 2023 17:54:51 +0800 Subject: [PATCH 03/10] support copying files for planning tasks --- .../optimize/AbstractArcticOptimizePlan.java | 21 ++++++++++++ .../ams/server/optimize/FullOptimizePlan.java | 8 +++-- .../optimize/SupportHiveFullOptimizePlan.java | 34 +++++++++++++++++++ 3 files changed, 61 insertions(+), 2 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java index cbcb25814f..8162b84fb8 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java @@ -35,6 +35,7 @@ import com.netease.arctic.data.file.WrapFileWithSequenceNumberHelper; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.ChangeTable; +import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; import com.netease.arctic.utils.SerializationUtils; import org.apache.iceberg.ContentFile; @@ -43,6 +44,7 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,6 +182,10 @@ protected BasicOptimizeTask buildOptimizeTask(@Nullable List sourc String customHiveSubdirectory = taskConfig.getCustomHiveSubdirectory(); if (customHiveSubdirectory != null) { properties.put(OptimizeTaskProperties.CUSTOM_HIVE_SUB_DIRECTORY, customHiveSubdirectory); + if (insertFiles.isEmpty() && deleteFiles.isEmpty() && posDeleteFiles.isEmpty() && + allLargeFiles(baseFiles)) { + properties.put(OptimizeTaskProperties.ENABLE_COPY_FILES, true + ""); + } } if (taskConfig.isMoveFilesToHiveLocation()) { properties.put(OptimizeTaskProperties.MOVE_FILES_TO_HIVE_LOCATION, true + ""); @@ -195,6 +201,21 @@ protected void addOptimizeFiles() { completeTree(); } + private boolean allLargeFiles(List files) { + long largeFileSize = getLargeFileSize(); + return files.stream().allMatch(file -> file.fileSizeInBytes() >= largeFileSize); + } + + /** + * The large files are files whose size is greater than 9/10(default) of target size. + */ + protected long getLargeFileSize() { + long targetSize = PropertyUtil.propertyAsLong(arcticTable.properties(), + TableProperties.SELF_OPTIMIZING_TARGET_SIZE, + TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT); + return targetSize * 9 / 10; + } + private void completeTree() { partitionFileTree.values().forEach(FileTree::completeTree); } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java index 9713948151..f5f71a291a 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java @@ -203,8 +203,7 @@ private List collectTasksWithBinPack(String partition, long t false, constructCustomHiveSubdirectory(baseFiles) ); - List> packed = new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) - .pack(baseFiles, DataFile::fileSizeInBytes); + List> packed = binPackFiles(taskSize, baseFiles, !posDeleteFiles.isEmpty()); for (List files : packed) { if (CollectionUtils.isNotEmpty(files)) { collector.add(buildOptimizeTask(null, @@ -216,6 +215,11 @@ false, constructCustomHiveSubdirectory(baseFiles) return collector; } + protected List> binPackFiles(long taskSize, List baseFiles, boolean deleteExist) { + return new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) + .pack(baseFiles, DataFile::fileSizeInBytes); + } + private long getOptimizingTargetSize() { return CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), TableProperties.SELF_OPTIMIZING_TARGET_SIZE, diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java index 8b27173f98..6215832c8a 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java @@ -29,9 +29,11 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.util.BinPacking; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -113,4 +115,36 @@ protected boolean partitionNeedPlan(String partitionToPath) { protected boolean nodeTaskNeedBuild(String partition, List posDeleteFiles, List baseFiles) { return true; } + + @Override + protected List> binPackFiles(long taskSize, List baseFiles, boolean deleteExist) { + if (deleteExist) { + return super.binPackFiles(taskSize, baseFiles, deleteExist); + } else { + // The large files are files whose size is greater than 9/10(default) of target size, others are the remain files. + // The reason why packed them separately is to avoid the small files and large files packed together, because a + // task with only large files can be executed by copying files, which is more efficient than reading and writing + // files. For example, if we have 3 files with size 120 MB, 8 MB, 8 MB, and the target size is 128 MB, if we pack + // them together, we will get 2 tasks, one is 120 MB + 8 MB, the other is 8 MB. If we pack them separately, we + // will get 2 tasks, one is 120 MB, the other is 8 MB + 8 MB, the first task can be executed very efficiently by + // copying files. + List largeFiles = new ArrayList<>(); + List remainFiles = new ArrayList<>(); + long largeFileSize = getLargeFileSize(); + for (DataFile baseFile : baseFiles) { + if (baseFile.fileSizeInBytes() >= largeFileSize) { + largeFiles.add(baseFile); + } else { + remainFiles.add(baseFile); + } + } + List> packed = new ArrayList<>(); + packed.addAll(new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) + .pack(largeFiles, DataFile::fileSizeInBytes)); + packed.addAll(new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) + .pack(remainFiles, DataFile::fileSizeInBytes)); + + return packed; + } + } } From 5cd94a130601f1a24004461f2d12d4aecd4d6f1c Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 24 Nov 2023 18:57:58 +0800 Subject: [PATCH 04/10] add test case --- .../ams/server/OptimizingIntegrationTest.java | 12 +++++++++ .../optimize/MixedHiveOptimizingTest.java | 26 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/OptimizingIntegrationTest.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/OptimizingIntegrationTest.java index 488b32128c..019c098894 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/OptimizingIntegrationTest.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/OptimizingIntegrationTest.java @@ -13,6 +13,7 @@ import com.netease.arctic.table.TableBuilder; import com.netease.arctic.table.TableIdentifier; import com.netease.arctic.table.TableProperties; +import com.netease.arctic.table.UnkeyedTable; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -59,6 +60,7 @@ public class OptimizingIntegrationTest { private static final TableIdentifier TB_12 = TableIdentifier.of(ICEBERG_CATALOG, DATABASE, "iceberg_table12"); private static final TableIdentifier TB_13 = TableIdentifier.of(CATALOG, DATABASE, "test_table13"); private static final TableIdentifier TB_14 = TableIdentifier.of(CATALOG, DATABASE, "test_table14"); + private static final TableIdentifier TB_15 = TableIdentifier.of(HIVE_CATALOG, DATABASE, "hive_table15"); private static final ConcurrentHashMap catalogsCache = new ConcurrentHashMap<>(); @@ -219,6 +221,16 @@ public void testHiveKeyedTableMajorOptimizeAndMove() throws TException, IOExcept testCase.testHiveKeyedTableMajorOptimizeAndMove(); } + @Test + public void testHiveUnKeyedTableFullOptimizeCopyingFiles() throws TException, IOException { + createHiveArcticTable(TB_15, PrimaryKeySpec.noPrimaryKey(), PartitionSpec.unpartitioned()); + assertTableExist(TB_15); + UnkeyedTable table = catalog(HIVE_CATALOG).loadTable(TB_15).asUnkeyedTable(); + MixedHiveOptimizingTest testCase = + new MixedHiveOptimizingTest(table, TEST_HMS.getHiveClient(), getOptimizeHistoryStartId()); + testCase.testHiveUnKeyedTableFullOptimizeCopyingFiles(); + } + private static long getOptimizeHistoryStartId() { return ServiceContainer.getOptimizeService().maxOptimizeHistoryId(); } diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/MixedHiveOptimizingTest.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/MixedHiveOptimizingTest.java index d63fdf0d82..3a26148a73 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/MixedHiveOptimizingTest.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/MixedHiveOptimizingTest.java @@ -24,9 +24,11 @@ import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableIdentifier; import com.netease.arctic.table.TableProperties; +import com.netease.arctic.table.UnkeyedTable; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -120,6 +122,30 @@ public void testHiveKeyedTableMajorOptimizeAndMove() throws TException, IOExcept assertOptimizeHangUp(tb, startId + offset); } + public void testHiveUnKeyedTableFullOptimizeCopyingFiles() throws TException, IOException { + int offset = 1; + UnkeyedTable table = arcticTable.asUnkeyedTable(); + TableIdentifier tb = table.id(); + // write 2 small files and 1 large file + updateProperties(arcticTable, TableProperties.ENABLE_SELF_OPTIMIZING, false + ""); + updateProperties(arcticTable, TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, 1000 + ""); + writeBase(table, rangeFromTo(1, 2, "aaa", quickDateWithZone(3))); + writeBase(table, rangeFromTo(3, 4, "aaa", quickDateWithZone(3))); + List largeFile = writeBase(table, rangeFromTo(5, 5000, "aaa", quickDateWithZone(3))); + long targetFileSize = (largeFile.get(0).fileSizeInBytes() - 10) * 10 / 9; + updateProperties(arcticTable, TableProperties.SELF_OPTIMIZING_TARGET_SIZE, targetFileSize + ""); + + // wait Full Optimize result + updateProperties(arcticTable, TableProperties.ENABLE_SELF_OPTIMIZING, true + ""); + OptimizeHistory optimizeHistory = waitOptimizeResult(tb, startId + offset++); + assertOptimizeHistory(optimizeHistory, OptimizeType.FullMajor, 3, 2); + assertIdRange(readRecords(table), 1, 5000); + // assert file are in hive location + assertIdRange(readHiveTableData(), 1, 5000); + + assertOptimizeHangUp(tb, startId + offset); + } + private Record newRecord(Object... val) { return newRecord(arcticTable.schema(), val); } From 00b3d586d10c7c0932f473b25b7c40b154053830 Mon Sep 17 00:00:00 2001 From: wangtao Date: Mon, 27 Nov 2023 19:41:32 +0800 Subject: [PATCH 05/10] optimizing executor using a bigger target size if inputSize <= (targetSize + fragmentSize) --- .../operator/executor/AbstractExecutor.java | 34 +++++++++++++++++++ .../operator/executor/IcebergExecutor.java | 9 ++--- .../operator/executor/MajorExecutor.java | 9 ++--- 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/AbstractExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/AbstractExecutor.java index 03cfc7d081..00a9816907 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/AbstractExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/AbstractExecutor.java @@ -33,6 +33,7 @@ import com.netease.arctic.utils.map.StructLikeCollections; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,4 +130,37 @@ protected void checkIfTimeout(Closeable writer) throws Exception { actualExecuteTime, task.getMaxExecuteTime(), factor)); } } + + /** + * Estimates a larger max target file size than the target size to avoid creating tiny remainder + * files. + * + *

While we create tasks that should all be smaller than our target size, there is a chance + * that the actual data will end up being larger than our target size due to various factors of + * compression, serialization, which are outside our control. If this occurs, instead of making a + * single file that is close in size to our target, we would end up producing one file of the + * target size, and then a small extra file with the remaining data. + * + *

For example, if our target is 128 MB, we may generate a rewrite task that should be 120 MB. + * When we write the data we may find we actually have to write out 138 MB. If we use the target + * size while writing, we would produce a 128 MB file and an 10 MB file. If instead we use a + * larger size estimated by this method, then we end up writing a single file. + */ + protected long targetFileSize(long inputSize) { + long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), + TableProperties.SELF_OPTIMIZING_TARGET_SIZE, + TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT); + long maxTargetFileSize = getMaxTargetFileSize(targetFileSize); + if (inputSize <= maxTargetFileSize) { + return maxTargetFileSize; + } else { + return targetFileSize; + } + } + + private long getMaxTargetFileSize(long targetFileSize) { + int fragmentRatio = PropertyUtil.propertyAsInt(table.properties(), TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO, + TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT); + return targetFileSize + targetFileSize / fragmentRatio; + } } diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/IcebergExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/IcebergExecutor.java index b8870b8f24..89e59cc89b 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/IcebergExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/IcebergExecutor.java @@ -41,7 +41,6 @@ import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,6 +124,10 @@ private List> optimizeDeleteFiles() throws Exception { return icebergPosDeleteWriter.complete(); } + protected long inputSize() { + return task.allIcebergDataFiles().stream().mapToLong(ContentFile::fileSizeInBytes).sum(); + } + private CombinedIcebergScanTask buildIcebergScanTask() { return new CombinedIcebergScanTask(task.allIcebergDataFiles().toArray(new DataFileWithSequence[0]), task.allIcebergDeleteFiles().toArray(new DeleteFileWithSequence[0]), @@ -138,9 +141,7 @@ private List> optimizeDataFiles() throws Exception { false, IdentityPartitionConverters::convertConstant, false, structLikeCollections); String formatAsString = table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - long targetSizeByBytes = PropertyUtil.propertyAsLong(table.properties(), - com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE, - com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT); + long targetSizeByBytes = targetFileSize(inputSize()); OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table.asUnkeyedTable(), table.spec().specId(), task.getAttemptId()).build(); diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java index 17f5e275eb..be274a3729 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java @@ -48,7 +48,6 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +64,10 @@ public MajorExecutor(NodeTask nodeTask, ArcticTable table, long startTime, Optim super(nodeTask, table, startTime, config); } + protected long inputSize() { + return task.dataFiles().stream().mapToLong(DataFile::fileSizeInBytes).sum(); + } + @Override public OptimizeTaskResult execute() throws Exception { Iterable targetFiles; @@ -125,9 +128,7 @@ private Iterable optimizeTable(CloseableIterator recordIterato } else { transactionId = null; } - long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), - com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE, - com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT); + long targetFileSize = targetFileSize(inputSize()); TaskWriter writer = AdaptHiveGenericTaskWriterBuilder.builderFor(table) .withTransactionId(transactionId) .withTaskId(task.getAttemptId()) From 0185d7cd40b5da934ea05ad384b44fffd5710489 Mon Sep 17 00:00:00 2001 From: wangtao Date: Mon, 27 Nov 2023 19:47:48 +0800 Subject: [PATCH 06/10] optimizer copy files with '.' as prefix --- .../arctic/optimizer/operator/executor/MajorExecutor.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java index be274a3729..1a01d747ed 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java @@ -107,7 +107,12 @@ private Iterable copyFiles(List dataFiles, String cu table.spec(), dataFile.partition(), customHiveSubdirectory); String sourcePath = dataFile.path().toString(); - String targetPath = TableFileUtils.getNewFilePath(hiveLocation, sourcePath); + String targetPath; + if (sourcePath.startsWith(".")) { + targetPath = TableFileUtils.getNewFilePath(hiveLocation, sourcePath); + } else { + targetPath = TableFileUtils.getNewFilePath(hiveLocation, "." + sourcePath); + } LOG.info("Start copy file from {} to {}", sourcePath, targetPath); long startTime = System.currentTimeMillis(); table.io().copy(sourcePath, targetPath); From 27b66149ab2c86b1e6c53a63aac958aaf77da336 Mon Sep 17 00:00:00 2001 From: wangtao Date: Mon, 27 Nov 2023 21:11:23 +0800 Subject: [PATCH 07/10] fix coping files --- .../operator/executor/MajorExecutor.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java index 1a01d747ed..61089c3224 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java @@ -47,10 +47,11 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -84,7 +85,7 @@ public OptimizeTaskResult execute() throws Exception { } else { LOG.info("Task {} of {} enables copy files, copy {} data files", task.getTaskId(), task.getTableIdentifier(), task.dataFiles().size()); - targetFiles = table.io().doAs(() -> copyFiles(task.dataFiles(), task.getCustomHiveSubdirectory())); + targetFiles = copyFiles(task.dataFiles(), task.getCustomHiveSubdirectory()); return buildOptimizeResult(targetFiles); } } @@ -102,24 +103,29 @@ public OptimizeTaskResult execute() throws Exception { } private Iterable copyFiles(List dataFiles, String customHiveSubdirectory) { - return Iterables.transform(dataFiles, dataFile -> { + int count = 0; + List targetFiles = new ArrayList<>(dataFiles.size()); + for (PrimaryKeyedFile dataFile : dataFiles) { String hiveLocation = HiveTableUtil.newHiveDataLocation(((SupportHive) table).hiveLocation(), table.spec(), dataFile.partition(), customHiveSubdirectory); String sourcePath = dataFile.path().toString(); + String sourceFileName = TableFileUtils.getFileName(sourcePath); + String targetPath; - if (sourcePath.startsWith(".")) { - targetPath = TableFileUtils.getNewFilePath(hiveLocation, sourcePath); + if (sourceFileName.startsWith(".")) { + targetPath = hiveLocation + File.separator + sourceFileName; } else { - targetPath = TableFileUtils.getNewFilePath(hiveLocation, "." + sourcePath); + targetPath = hiveLocation + File.separator + "." + sourceFileName; } - LOG.info("Start copy file from {} to {}", sourcePath, targetPath); + LOG.info("[{}] Start copying file from {} to {}", count++, sourcePath, targetPath); long startTime = System.currentTimeMillis(); table.io().copy(sourcePath, targetPath); - LOG.info("Successfully copy file to {}, cost {} ms", targetPath, + LOG.info("Successfully copied file {}, cost {} ms", sourceFileName, System.currentTimeMillis() - startTime); - return DataFiles.builder(table.spec()).copy(dataFile).withPath(targetPath).build(); - }); + targetFiles.add(DataFiles.builder(table.spec()).copy(dataFile).withPath(targetPath).build()); + } + return targetFiles; } @Override From 46aa3514c952671b6fe80b9f28b42651fafa1cd4 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 30 Nov 2023 16:49:55 +0800 Subject: [PATCH 08/10] change error message --- .../src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java index 7b3dfef68b..9dfe0a1fa0 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java @@ -192,7 +192,7 @@ public void copy(String src, String dts) { try { FileUtil.copy(fs, srcPath, fs, dtsPath, false, true, conf()); } catch (IOException e) { - throw new UncheckedIOException("Fail to rename: from " + src + " to " + dts, e); + throw new UncheckedIOException("Fail to copy: from " + src + " to " + dts, e); } return null; }); From a2a8eacb1f921c432e88ecb9357ebb420fb4fee2 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 30 Nov 2023 17:07:33 +0800 Subject: [PATCH 09/10] change property name to copy-files-to-new-location --- .../arctic/ams/api/properties/OptimizeTaskProperties.java | 2 +- .../arctic/ams/server/optimize/AbstractArcticOptimizePlan.java | 2 +- .../com/netease/arctic/optimizer/operator/BaseTaskExecutor.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java b/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java index f0b2984ddc..c4a3c87168 100644 --- a/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java +++ b/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java @@ -4,7 +4,7 @@ public class OptimizeTaskProperties { // optimize task properties public static final String ALL_FILE_COUNT = "all-file-cnt"; public static final String CUSTOM_HIVE_SUB_DIRECTORY = "custom-hive-sub-directory"; - public static final String ENABLE_COPY_FILES= "copy-files-enabled"; + public static final String COPY_FILES_TO_NEW_LOCATION = "copy-files-to-new-location"; public static final String MAX_EXECUTE_TIME = "max-execute-time"; public static final String MOVE_FILES_TO_HIVE_LOCATION = "move-files-to-hive-location"; } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java index 8162b84fb8..515b2dec82 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java @@ -184,7 +184,7 @@ protected BasicOptimizeTask buildOptimizeTask(@Nullable List sourc properties.put(OptimizeTaskProperties.CUSTOM_HIVE_SUB_DIRECTORY, customHiveSubdirectory); if (insertFiles.isEmpty() && deleteFiles.isEmpty() && posDeleteFiles.isEmpty() && allLargeFiles(baseFiles)) { - properties.put(OptimizeTaskProperties.ENABLE_COPY_FILES, true + ""); + properties.put(OptimizeTaskProperties.COPY_FILES_TO_NEW_LOCATION, true + ""); } } if (taskConfig.isMoveFilesToHiveLocation()) { diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java index 79a076d1ca..d47f877068 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java @@ -234,7 +234,7 @@ private NodeTask constructTask(ArcticTable table, OptimizeTask task, int attempt String customHiveSubdirectory = properties.get(OptimizeTaskProperties.CUSTOM_HIVE_SUB_DIRECTORY); nodeTask.setCustomHiveSubdirectory(customHiveSubdirectory); nodeTask.setCopyFiles( - PropertyUtil.propertyAsBoolean(properties, OptimizeTaskProperties.ENABLE_COPY_FILES, false)); + PropertyUtil.propertyAsBoolean(properties, OptimizeTaskProperties.COPY_FILES_TO_NEW_LOCATION, false)); Long maxExecuteTime = PropertyUtil.propertyAsLong(properties, OptimizeTaskProperties.MAX_EXECUTE_TIME, TableProperties.SELF_OPTIMIZING_EXECUTE_TIMEOUT_DEFAULT); From f1d25da53dc64cb61440a5945ac54a70c6746180 Mon Sep 17 00:00:00 2001 From: wangtao Date: Fri, 1 Dec 2023 11:30:31 +0800 Subject: [PATCH 10/10] modify logs and code --- .../operator/executor/MajorExecutor.java | 37 +++++++------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java index 61089c3224..b50837bef1 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java @@ -74,20 +74,11 @@ public OptimizeTaskResult execute() throws Exception { Iterable targetFiles; LOG.info("Start processing arctic table major optimize task {} of {}: {}", task.getTaskId(), task.getTableIdentifier(), task); - if (TableTypeUtil.isHive(table) && task.isCopyFiles()) { - if (!task.posDeleteFiles().isEmpty() || !task.deleteFiles().isEmpty()) { - LOG.info("Task {} of {} enables copy files, but has delete files {} {}, should not copy", task.getTaskId(), - task.getTableIdentifier(), task.posDeleteFiles().size(), task.deleteFiles().size()); - } else if (task.getCustomHiveSubdirectory() == null) { - LOG.info("Task {} of {} enables copy files, but custom hive directory is null, should not copy", - task.getTaskId(), - task.getTableIdentifier()); - } else { - LOG.info("Task {} of {} enables copy files, copy {} data files", task.getTaskId(), task.getTableIdentifier(), - task.dataFiles().size()); - targetFiles = copyFiles(task.dataFiles(), task.getCustomHiveSubdirectory()); - return buildOptimizeResult(targetFiles); - } + if (supportCopyFiles()) { + LOG.info("Task {} of {} enables copy files, copy {} data files", task.getTaskId(), task.getTableIdentifier(), + task.dataFiles().size()); + targetFiles = copyFiles(task.dataFiles(), task.getCustomHiveSubdirectory()); + return buildOptimizeResult(targetFiles); } Map> deleteFileMap = groupDeleteFilesByNode(task.posDeleteFiles()); @@ -102,8 +93,13 @@ public OptimizeTaskResult execute() throws Exception { return buildOptimizeResult(targetFiles); } + private boolean supportCopyFiles() { + return TableTypeUtil.isHive(table) && task.isCopyFiles() && task.getCustomHiveSubdirectory() != null && + task.posDeleteFiles().isEmpty() && task.deleteFiles().isEmpty(); + } + private Iterable copyFiles(List dataFiles, String customHiveSubdirectory) { - int count = 0; + // Only mixed hive tables support copying files List targetFiles = new ArrayList<>(dataFiles.size()); for (PrimaryKeyedFile dataFile : dataFiles) { String hiveLocation = HiveTableUtil.newHiveDataLocation(((SupportHive) table).hiveLocation(), @@ -112,16 +108,11 @@ private Iterable copyFiles(List dataFiles, String cu String sourcePath = dataFile.path().toString(); String sourceFileName = TableFileUtils.getFileName(sourcePath); - String targetPath; - if (sourceFileName.startsWith(".")) { - targetPath = hiveLocation + File.separator + sourceFileName; - } else { - targetPath = hiveLocation + File.separator + "." + sourceFileName; - } - LOG.info("[{}] Start copying file from {} to {}", count++, sourcePath, targetPath); + // The copied files must be hidden files, which are start with `.` + String targetPath = hiveLocation + File.separator + "." + sourceFileName; long startTime = System.currentTimeMillis(); table.io().copy(sourcePath, targetPath); - LOG.info("Successfully copied file {}, cost {} ms", sourceFileName, + LOG.info("Successfully copied file {}, cost {} ms", sourcePath, System.currentTimeMillis() - startTime); targetFiles.add(DataFiles.builder(table.spec()).copy(dataFile).withPath(targetPath).build()); }