diff --git a/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java b/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java index a787fa36df..c4a3c87168 100644 --- a/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java +++ b/ams/ams-api/src/main/java/com/netease/arctic/ams/api/properties/OptimizeTaskProperties.java @@ -4,6 +4,7 @@ public class OptimizeTaskProperties { // optimize task properties public static final String ALL_FILE_COUNT = "all-file-cnt"; public static final String CUSTOM_HIVE_SUB_DIRECTORY = "custom-hive-sub-directory"; + public static final String COPY_FILES_TO_NEW_LOCATION = "copy-files-to-new-location"; public static final String MAX_EXECUTE_TIME = "max-execute-time"; public static final String MOVE_FILES_TO_HIVE_LOCATION = "move-files-to-hive-location"; } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java index cbcb25814f..515b2dec82 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/AbstractArcticOptimizePlan.java @@ -35,6 +35,7 @@ import com.netease.arctic.data.file.WrapFileWithSequenceNumberHelper; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.ChangeTable; +import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; import com.netease.arctic.utils.SerializationUtils; import org.apache.iceberg.ContentFile; @@ -43,6 +44,7 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -180,6 +182,10 @@ protected BasicOptimizeTask buildOptimizeTask(@Nullable List sourc String customHiveSubdirectory = taskConfig.getCustomHiveSubdirectory(); if (customHiveSubdirectory != null) { properties.put(OptimizeTaskProperties.CUSTOM_HIVE_SUB_DIRECTORY, customHiveSubdirectory); + if (insertFiles.isEmpty() && deleteFiles.isEmpty() && posDeleteFiles.isEmpty() && + allLargeFiles(baseFiles)) { + properties.put(OptimizeTaskProperties.COPY_FILES_TO_NEW_LOCATION, true + ""); + } } if (taskConfig.isMoveFilesToHiveLocation()) { properties.put(OptimizeTaskProperties.MOVE_FILES_TO_HIVE_LOCATION, true + ""); @@ -195,6 +201,21 @@ protected void addOptimizeFiles() { completeTree(); } + private boolean allLargeFiles(List files) { + long largeFileSize = getLargeFileSize(); + return files.stream().allMatch(file -> file.fileSizeInBytes() >= largeFileSize); + } + + /** + * The large files are files whose size is greater than 9/10(default) of target size. + */ + protected long getLargeFileSize() { + long targetSize = PropertyUtil.propertyAsLong(arcticTable.properties(), + TableProperties.SELF_OPTIMIZING_TARGET_SIZE, + TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT); + return targetSize * 9 / 10; + } + private void completeTree() { partitionFileTree.values().forEach(FileTree::completeTree); } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java index 9713948151..f5f71a291a 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java @@ -203,8 +203,7 @@ private List collectTasksWithBinPack(String partition, long t false, constructCustomHiveSubdirectory(baseFiles) ); - List> packed = new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) - .pack(baseFiles, DataFile::fileSizeInBytes); + List> packed = binPackFiles(taskSize, baseFiles, !posDeleteFiles.isEmpty()); for (List files : packed) { if (CollectionUtils.isNotEmpty(files)) { collector.add(buildOptimizeTask(null, @@ -216,6 +215,11 @@ false, constructCustomHiveSubdirectory(baseFiles) return collector; } + protected List> binPackFiles(long taskSize, List baseFiles, boolean deleteExist) { + return new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) + .pack(baseFiles, DataFile::fileSizeInBytes); + } + private long getOptimizingTargetSize() { return CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(), TableProperties.SELF_OPTIMIZING_TARGET_SIZE, diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java index 8b27173f98..6215832c8a 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveFullOptimizePlan.java @@ -29,9 +29,11 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.util.BinPacking; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -113,4 +115,36 @@ protected boolean partitionNeedPlan(String partitionToPath) { protected boolean nodeTaskNeedBuild(String partition, List posDeleteFiles, List baseFiles) { return true; } + + @Override + protected List> binPackFiles(long taskSize, List baseFiles, boolean deleteExist) { + if (deleteExist) { + return super.binPackFiles(taskSize, baseFiles, deleteExist); + } else { + // The large files are files whose size is greater than 9/10(default) of target size, others are the remain files. + // The reason why packed them separately is to avoid the small files and large files packed together, because a + // task with only large files can be executed by copying files, which is more efficient than reading and writing + // files. For example, if we have 3 files with size 120 MB, 8 MB, 8 MB, and the target size is 128 MB, if we pack + // them together, we will get 2 tasks, one is 120 MB + 8 MB, the other is 8 MB. If we pack them separately, we + // will get 2 tasks, one is 120 MB, the other is 8 MB + 8 MB, the first task can be executed very efficiently by + // copying files. + List largeFiles = new ArrayList<>(); + List remainFiles = new ArrayList<>(); + long largeFileSize = getLargeFileSize(); + for (DataFile baseFile : baseFiles) { + if (baseFile.fileSizeInBytes() >= largeFileSize) { + largeFiles.add(baseFile); + } else { + remainFiles.add(baseFile); + } + } + List> packed = new ArrayList<>(); + packed.addAll(new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) + .pack(largeFiles, DataFile::fileSizeInBytes)); + packed.addAll(new BinPacking.ListPacker(taskSize, Integer.MAX_VALUE, true) + .pack(remainFiles, DataFile::fileSizeInBytes)); + + return packed; + } + } } diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/OptimizingIntegrationTest.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/OptimizingIntegrationTest.java index 488b32128c..019c098894 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/OptimizingIntegrationTest.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/OptimizingIntegrationTest.java @@ -13,6 +13,7 @@ import com.netease.arctic.table.TableBuilder; import com.netease.arctic.table.TableIdentifier; import com.netease.arctic.table.TableProperties; +import com.netease.arctic.table.UnkeyedTable; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -59,6 +60,7 @@ public class OptimizingIntegrationTest { private static final TableIdentifier TB_12 = TableIdentifier.of(ICEBERG_CATALOG, DATABASE, "iceberg_table12"); private static final TableIdentifier TB_13 = TableIdentifier.of(CATALOG, DATABASE, "test_table13"); private static final TableIdentifier TB_14 = TableIdentifier.of(CATALOG, DATABASE, "test_table14"); + private static final TableIdentifier TB_15 = TableIdentifier.of(HIVE_CATALOG, DATABASE, "hive_table15"); private static final ConcurrentHashMap catalogsCache = new ConcurrentHashMap<>(); @@ -219,6 +221,16 @@ public void testHiveKeyedTableMajorOptimizeAndMove() throws TException, IOExcept testCase.testHiveKeyedTableMajorOptimizeAndMove(); } + @Test + public void testHiveUnKeyedTableFullOptimizeCopyingFiles() throws TException, IOException { + createHiveArcticTable(TB_15, PrimaryKeySpec.noPrimaryKey(), PartitionSpec.unpartitioned()); + assertTableExist(TB_15); + UnkeyedTable table = catalog(HIVE_CATALOG).loadTable(TB_15).asUnkeyedTable(); + MixedHiveOptimizingTest testCase = + new MixedHiveOptimizingTest(table, TEST_HMS.getHiveClient(), getOptimizeHistoryStartId()); + testCase.testHiveUnKeyedTableFullOptimizeCopyingFiles(); + } + private static long getOptimizeHistoryStartId() { return ServiceContainer.getOptimizeService().maxOptimizeHistoryId(); } diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/MixedHiveOptimizingTest.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/MixedHiveOptimizingTest.java index d63fdf0d82..3a26148a73 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/MixedHiveOptimizingTest.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/MixedHiveOptimizingTest.java @@ -24,9 +24,11 @@ import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableIdentifier; import com.netease.arctic.table.TableProperties; +import com.netease.arctic.table.UnkeyedTable; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -120,6 +122,30 @@ public void testHiveKeyedTableMajorOptimizeAndMove() throws TException, IOExcept assertOptimizeHangUp(tb, startId + offset); } + public void testHiveUnKeyedTableFullOptimizeCopyingFiles() throws TException, IOException { + int offset = 1; + UnkeyedTable table = arcticTable.asUnkeyedTable(); + TableIdentifier tb = table.id(); + // write 2 small files and 1 large file + updateProperties(arcticTable, TableProperties.ENABLE_SELF_OPTIMIZING, false + ""); + updateProperties(arcticTable, TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, 1000 + ""); + writeBase(table, rangeFromTo(1, 2, "aaa", quickDateWithZone(3))); + writeBase(table, rangeFromTo(3, 4, "aaa", quickDateWithZone(3))); + List largeFile = writeBase(table, rangeFromTo(5, 5000, "aaa", quickDateWithZone(3))); + long targetFileSize = (largeFile.get(0).fileSizeInBytes() - 10) * 10 / 9; + updateProperties(arcticTable, TableProperties.SELF_OPTIMIZING_TARGET_SIZE, targetFileSize + ""); + + // wait Full Optimize result + updateProperties(arcticTable, TableProperties.ENABLE_SELF_OPTIMIZING, true + ""); + OptimizeHistory optimizeHistory = waitOptimizeResult(tb, startId + offset++); + assertOptimizeHistory(optimizeHistory, OptimizeType.FullMajor, 3, 2); + assertIdRange(readRecords(table), 1, 5000); + // assert file are in hive location + assertIdRange(readHiveTableData(), 1, 5000); + + assertOptimizeHangUp(tb, startId + offset); + } + private Record newRecord(Object... val) { return newRecord(arcticTable.schema(), val); } diff --git a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java index fa9ccbb7d7..ddafc895aa 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticFileIO.java @@ -62,6 +62,13 @@ public interface ArcticFileIO extends FileIO, Configurable { */ void rename(String oldPath, String newPath); + /** + * Copy file from old path to new path + * @param oldPath - source path + * @param newPath - target path + */ + void copy(String oldPath, String newPath); + /** Delete a directory recursively * * @param path the path to delete. diff --git a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java index 8e7ca1b879..9dfe0a1fa0 100644 --- a/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java @@ -21,6 +21,7 @@ import com.netease.arctic.table.TableMetaStore; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -179,6 +180,24 @@ public void rename(String src, String dts) { }); } + @Override + public void copy(String src, String dts) { + tableMetaStore.doAs(() -> { + Path srcPath = new Path(src); + Path dtsPath = new Path(dts); + FileSystem fs = getFs(srcPath); + if (fs.isDirectory(srcPath)) { + throw new IllegalArgumentException("can't copy directory"); + } + try { + FileUtil.copy(fs, srcPath, fs, dtsPath, false, true, conf()); + } catch (IOException e) { + throw new UncheckedIOException("Fail to copy: from " + src + " to " + dts, e); + } + return null; + }); + } + @Override public T doAs(Callable callable) { return tableMetaStore.doAs(callable); diff --git a/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java b/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java index 52e6f7db6c..82befa11b6 100644 --- a/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java +++ b/core/src/main/java/com/netease/arctic/io/RecoverableArcticFileIO.java @@ -55,6 +55,11 @@ public void rename(String oldPath, String newPath) { fileIO.rename(oldPath, newPath); } + @Override + public void copy(String oldPath, String newPath) { + fileIO.copy(oldPath, newPath); + } + @Override public void deleteDirectoryRecursively(String path) { //Do not move trash when deleting directory as it is used for dropping table only diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java index 494fed195f..d47f877068 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java @@ -233,6 +233,8 @@ private NodeTask constructTask(ArcticTable table, OptimizeTask task, int attempt String customHiveSubdirectory = properties.get(OptimizeTaskProperties.CUSTOM_HIVE_SUB_DIRECTORY); nodeTask.setCustomHiveSubdirectory(customHiveSubdirectory); + nodeTask.setCopyFiles( + PropertyUtil.propertyAsBoolean(properties, OptimizeTaskProperties.COPY_FILES_TO_NEW_LOCATION, false)); Long maxExecuteTime = PropertyUtil.propertyAsLong(properties, OptimizeTaskProperties.MAX_EXECUTE_TIME, TableProperties.SELF_OPTIMIZING_EXECUTE_TIMEOUT_DEFAULT); diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/AbstractExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/AbstractExecutor.java index 03cfc7d081..00a9816907 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/AbstractExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/AbstractExecutor.java @@ -33,6 +33,7 @@ import com.netease.arctic.utils.map.StructLikeCollections; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,4 +130,37 @@ protected void checkIfTimeout(Closeable writer) throws Exception { actualExecuteTime, task.getMaxExecuteTime(), factor)); } } + + /** + * Estimates a larger max target file size than the target size to avoid creating tiny remainder + * files. + * + *

While we create tasks that should all be smaller than our target size, there is a chance + * that the actual data will end up being larger than our target size due to various factors of + * compression, serialization, which are outside our control. If this occurs, instead of making a + * single file that is close in size to our target, we would end up producing one file of the + * target size, and then a small extra file with the remaining data. + * + *

For example, if our target is 128 MB, we may generate a rewrite task that should be 120 MB. + * When we write the data we may find we actually have to write out 138 MB. If we use the target + * size while writing, we would produce a 128 MB file and an 10 MB file. If instead we use a + * larger size estimated by this method, then we end up writing a single file. + */ + protected long targetFileSize(long inputSize) { + long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), + TableProperties.SELF_OPTIMIZING_TARGET_SIZE, + TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT); + long maxTargetFileSize = getMaxTargetFileSize(targetFileSize); + if (inputSize <= maxTargetFileSize) { + return maxTargetFileSize; + } else { + return targetFileSize; + } + } + + private long getMaxTargetFileSize(long targetFileSize) { + int fragmentRatio = PropertyUtil.propertyAsInt(table.properties(), TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO, + TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT); + return targetFileSize + targetFileSize / fragmentRatio; + } } diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/IcebergExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/IcebergExecutor.java index b8870b8f24..89e59cc89b 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/IcebergExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/IcebergExecutor.java @@ -41,7 +41,6 @@ import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,6 +124,10 @@ private List> optimizeDeleteFiles() throws Exception { return icebergPosDeleteWriter.complete(); } + protected long inputSize() { + return task.allIcebergDataFiles().stream().mapToLong(ContentFile::fileSizeInBytes).sum(); + } + private CombinedIcebergScanTask buildIcebergScanTask() { return new CombinedIcebergScanTask(task.allIcebergDataFiles().toArray(new DataFileWithSequence[0]), task.allIcebergDeleteFiles().toArray(new DeleteFileWithSequence[0]), @@ -138,9 +141,7 @@ private List> optimizeDataFiles() throws Exception { false, IdentityPartitionConverters::convertConstant, false, structLikeCollections); String formatAsString = table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - long targetSizeByBytes = PropertyUtil.propertyAsLong(table.properties(), - com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE, - com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT); + long targetSizeByBytes = targetFileSize(inputSize()); OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table.asUnkeyedTable(), table.spec().specId(), task.getAttemptId()).build(); diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java index b7b73d47fe..b50837bef1 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/MajorExecutor.java @@ -24,6 +24,9 @@ import com.netease.arctic.data.PrimaryKeyedFile; import com.netease.arctic.hive.io.reader.AdaptHiveGenericArcticDataReader; import com.netease.arctic.hive.io.writer.AdaptHiveGenericTaskWriterBuilder; +import com.netease.arctic.hive.table.SupportHive; +import com.netease.arctic.hive.utils.HiveTableUtil; +import com.netease.arctic.hive.utils.TableTypeUtil; import com.netease.arctic.optimizer.OptimizerConfig; import com.netease.arctic.scan.ArcticFileScanTask; import com.netease.arctic.scan.BasicArcticFileScanTask; @@ -33,8 +36,10 @@ import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.PrimaryKeySpec; import com.netease.arctic.table.WriteOperationKind; +import com.netease.arctic.utils.TableFileUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.Schema; import org.apache.iceberg.TableProperties; @@ -42,10 +47,11 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -59,11 +65,21 @@ public MajorExecutor(NodeTask nodeTask, ArcticTable table, long startTime, Optim super(nodeTask, table, startTime, config); } + protected long inputSize() { + return task.dataFiles().stream().mapToLong(DataFile::fileSizeInBytes).sum(); + } + @Override public OptimizeTaskResult execute() throws Exception { Iterable targetFiles; LOG.info("Start processing arctic table major optimize task {} of {}: {}", task.getTaskId(), task.getTableIdentifier(), task); + if (supportCopyFiles()) { + LOG.info("Task {} of {} enables copy files, copy {} data files", task.getTaskId(), task.getTableIdentifier(), + task.dataFiles().size()); + targetFiles = copyFiles(task.dataFiles(), task.getCustomHiveSubdirectory()); + return buildOptimizeResult(targetFiles); + } Map> deleteFileMap = groupDeleteFilesByNode(task.posDeleteFiles()); List dataFiles = task.dataFiles(); @@ -77,6 +93,32 @@ public OptimizeTaskResult execute() throws Exception { return buildOptimizeResult(targetFiles); } + private boolean supportCopyFiles() { + return TableTypeUtil.isHive(table) && task.isCopyFiles() && task.getCustomHiveSubdirectory() != null && + task.posDeleteFiles().isEmpty() && task.deleteFiles().isEmpty(); + } + + private Iterable copyFiles(List dataFiles, String customHiveSubdirectory) { + // Only mixed hive tables support copying files + List targetFiles = new ArrayList<>(dataFiles.size()); + for (PrimaryKeyedFile dataFile : dataFiles) { + String hiveLocation = HiveTableUtil.newHiveDataLocation(((SupportHive) table).hiveLocation(), + table.spec(), dataFile.partition(), customHiveSubdirectory); + + String sourcePath = dataFile.path().toString(); + String sourceFileName = TableFileUtils.getFileName(sourcePath); + + // The copied files must be hidden files, which are start with `.` + String targetPath = hiveLocation + File.separator + "." + sourceFileName; + long startTime = System.currentTimeMillis(); + table.io().copy(sourcePath, targetPath); + LOG.info("Successfully copied file {}, cost {} ms", sourcePath, + System.currentTimeMillis() - startTime); + targetFiles.add(DataFiles.builder(table.spec()).copy(dataFile).withPath(targetPath).build()); + } + return targetFiles; + } + @Override public void close() { } @@ -88,9 +130,7 @@ private Iterable optimizeTable(CloseableIterator recordIterato } else { transactionId = null; } - long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), - com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE, - com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT); + long targetFileSize = targetFileSize(inputSize()); TaskWriter writer = AdaptHiveGenericTaskWriterBuilder.builderFor(table) .withTransactionId(transactionId) .withTaskId(task.getAttemptId()) diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/NodeTask.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/NodeTask.java index ef08172a43..227dd8a294 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/NodeTask.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/executor/NodeTask.java @@ -58,6 +58,7 @@ public class NodeTask { private TableIdentifier tableIdentifier; private int attemptId; private String customHiveSubdirectory; + private boolean copyFiles; private Long maxExecuteTime; public NodeTask(List> baseFiles, @@ -258,6 +259,14 @@ public void setCustomHiveSubdirectory(String customHiveSubdirectory) { this.customHiveSubdirectory = customHiveSubdirectory; } + public boolean isCopyFiles() { + return copyFiles; + } + + public void setCopyFiles(boolean copyFiles) { + this.copyFiles = copyFiles; + } + public Long getMaxExecuteTime() { return maxExecuteTime; } @@ -277,6 +286,7 @@ public String toString() { .add("taskId", taskId) .add("attemptId", attemptId) .add("tableIdentifier", tableIdentifier) + .add("copyFiles", copyFiles) .add("baseFiles", baseFiles.size()) .add("insertFiles", insertFiles.size()) .add("deleteFiles", deleteFiles.size()) diff --git a/optimizer/src/test/java/com/netease/arctic/optimizer/operator/executor/TestSupportHiveMajorExecutor.java b/optimizer/src/test/java/com/netease/arctic/optimizer/operator/executor/TestSupportHiveMajorExecutor.java index 757fe0bf72..09adbd4a3c 100644 --- a/optimizer/src/test/java/com/netease/arctic/optimizer/operator/executor/TestSupportHiveMajorExecutor.java +++ b/optimizer/src/test/java/com/netease/arctic/optimizer/operator/executor/TestSupportHiveMajorExecutor.java @@ -23,11 +23,13 @@ import com.netease.arctic.ams.api.OptimizeType; import com.netease.arctic.data.DataTreeNode; import com.netease.arctic.data.file.ContentFileWithSequence; +import com.netease.arctic.hive.utils.HiveTableUtil; import com.netease.arctic.optimizer.OptimizerConfig; import com.netease.arctic.optimizer.util.ContentFileUtil; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; +import com.netease.arctic.utils.TableFileUtils; import org.junit.Assert; import org.junit.Test; @@ -117,6 +119,26 @@ public void testUnKeyedTableFullMajorExecutor() throws Exception { }); } + @Test + public void testUnKeyedTableFullMajorExecutorCopyFiles() throws Exception { + insertTableBaseDataFiles(testHiveTable, null, baseDataFilesInfo); + NodeTask nodeTask = constructNodeTask(testHiveTable, OptimizeType.FullMajor); + nodeTask.setCopyFiles(true); + String hiveSubdirectory = HiveTableUtil.newHiveSubdirectory(); + nodeTask.setCustomHiveSubdirectory(hiveSubdirectory); + String[] arg = new String[0]; + OptimizerConfig optimizerConfig = new OptimizerConfig(arg); + optimizerConfig.setOptimizerId("UnitTest"); + MajorExecutor majorExecutor = new MajorExecutor(nodeTask, testHiveTable, System.currentTimeMillis(), optimizerConfig); + OptimizeTaskResult result = majorExecutor.execute(); + Assert.assertEquals(Iterables.size(result.getTargetFiles()), 10); + result.getTargetFiles().forEach(dataFile -> { + Assert.assertEquals(100, dataFile.recordCount()); + Assert.assertEquals(testHiveTable.hiveLocation() + "/name=name/" + hiveSubdirectory, + TableFileUtils.getFileDir(dataFile.path().toString())); + }); + } + @Test public void testNoPartitionTableMajorExecutor() throws Exception { insertBasePosDeleteFiles(testUnPartitionKeyedHiveTable, 2L, baseDataFilesInfo, posDeleteFilesInfo);