Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-1619] 0.4.x support copy large files when executing full optimizing of Unkeyed Mixed Hive Table #2356

Open
wants to merge 10 commits into
base: 0.4.x
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public class OptimizeTaskProperties {
// optimize task properties
public static final String ALL_FILE_COUNT = "all-file-cnt";
public static final String CUSTOM_HIVE_SUB_DIRECTORY = "custom-hive-sub-directory";
public static final String COPY_FILES_TO_NEW_LOCATION = "copy-files-to-new-location";
public static final String MAX_EXECUTE_TIME = "max-execute-time";
public static final String MOVE_FILES_TO_HIVE_LOCATION = "move-files-to-hive-location";
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.netease.arctic.data.file.WrapFileWithSequenceNumberHelper;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.ChangeTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import com.netease.arctic.utils.SerializationUtils;
import org.apache.iceberg.ContentFile;
Expand All @@ -43,6 +44,7 @@
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.PropertyUtil;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -180,6 +182,10 @@ protected BasicOptimizeTask buildOptimizeTask(@Nullable List<DataTreeNode> sourc
String customHiveSubdirectory = taskConfig.getCustomHiveSubdirectory();
if (customHiveSubdirectory != null) {
properties.put(OptimizeTaskProperties.CUSTOM_HIVE_SUB_DIRECTORY, customHiveSubdirectory);
if (insertFiles.isEmpty() && deleteFiles.isEmpty() && posDeleteFiles.isEmpty() &&
allLargeFiles(baseFiles)) {
properties.put(OptimizeTaskProperties.COPY_FILES_TO_NEW_LOCATION, true + "");
}
}
if (taskConfig.isMoveFilesToHiveLocation()) {
properties.put(OptimizeTaskProperties.MOVE_FILES_TO_HIVE_LOCATION, true + "");
Expand All @@ -195,6 +201,21 @@ protected void addOptimizeFiles() {
completeTree();
}

private boolean allLargeFiles(List<DataFile> files) {
long largeFileSize = getLargeFileSize();
return files.stream().allMatch(file -> file.fileSizeInBytes() >= largeFileSize);
}

/**
* The large files are files whose size is greater than 9/10(default) of target size.
*/
protected long getLargeFileSize() {
long targetSize = PropertyUtil.propertyAsLong(arcticTable.properties(),
TableProperties.SELF_OPTIMIZING_TARGET_SIZE,
TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT);
return targetSize * 9 / 10;
}

private void completeTree() {
partitionFileTree.values().forEach(FileTree::completeTree);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ private List<BasicOptimizeTask> collectTasksWithBinPack(String partition, long t
false, constructCustomHiveSubdirectory(baseFiles)
);

List<List<DataFile>> packed = new BinPacking.ListPacker<DataFile>(taskSize, Integer.MAX_VALUE, true)
.pack(baseFiles, DataFile::fileSizeInBytes);
List<List<DataFile>> packed = binPackFiles(taskSize, baseFiles, !posDeleteFiles.isEmpty());
for (List<DataFile> files : packed) {
if (CollectionUtils.isNotEmpty(files)) {
collector.add(buildOptimizeTask(null,
Expand All @@ -216,6 +215,11 @@ false, constructCustomHiveSubdirectory(baseFiles)
return collector;
}

protected List<List<DataFile>> binPackFiles(long taskSize, List<DataFile> baseFiles, boolean deleteExist) {
return new BinPacking.ListPacker<DataFile>(taskSize, Integer.MAX_VALUE, true)
.pack(baseFiles, DataFile::fileSizeInBytes);
}

private long getOptimizingTargetSize() {
return CompatiblePropertyUtil.propertyAsLong(arcticTable.properties(),
TableProperties.SELF_OPTIMIZING_TARGET_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.util.BinPacking;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -113,4 +115,36 @@ protected boolean partitionNeedPlan(String partitionToPath) {
protected boolean nodeTaskNeedBuild(String partition, List<DeleteFile> posDeleteFiles, List<DataFile> baseFiles) {
return true;
}

@Override
protected List<List<DataFile>> binPackFiles(long taskSize, List<DataFile> baseFiles, boolean deleteExist) {
if (deleteExist) {
return super.binPackFiles(taskSize, baseFiles, deleteExist);
} else {
// The large files are files whose size is greater than 9/10(default) of target size, others are the remain files.
// The reason why packed them separately is to avoid the small files and large files packed together, because a
// task with only large files can be executed by copying files, which is more efficient than reading and writing
// files. For example, if we have 3 files with size 120 MB, 8 MB, 8 MB, and the target size is 128 MB, if we pack
// them together, we will get 2 tasks, one is 120 MB + 8 MB, the other is 8 MB. If we pack them separately, we
// will get 2 tasks, one is 120 MB, the other is 8 MB + 8 MB, the first task can be executed very efficiently by
// copying files.
List<DataFile> largeFiles = new ArrayList<>();
List<DataFile> remainFiles = new ArrayList<>();
long largeFileSize = getLargeFileSize();
for (DataFile baseFile : baseFiles) {
if (baseFile.fileSizeInBytes() >= largeFileSize) {
largeFiles.add(baseFile);
} else {
remainFiles.add(baseFile);
}
}
List<List<DataFile>> packed = new ArrayList<>();
packed.addAll(new BinPacking.ListPacker<DataFile>(taskSize, Integer.MAX_VALUE, true)
.pack(largeFiles, DataFile::fileSizeInBytes));
packed.addAll(new BinPacking.ListPacker<DataFile>(taskSize, Integer.MAX_VALUE, true)
.pack(remainFiles, DataFile::fileSizeInBytes));

return packed;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.netease.arctic.table.TableBuilder;
import com.netease.arctic.table.TableIdentifier;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class OptimizingIntegrationTest {
private static final TableIdentifier TB_12 = TableIdentifier.of(ICEBERG_CATALOG, DATABASE, "iceberg_table12");
private static final TableIdentifier TB_13 = TableIdentifier.of(CATALOG, DATABASE, "test_table13");
private static final TableIdentifier TB_14 = TableIdentifier.of(CATALOG, DATABASE, "test_table14");
private static final TableIdentifier TB_15 = TableIdentifier.of(HIVE_CATALOG, DATABASE, "hive_table15");

private static final ConcurrentHashMap<String, ArcticCatalog> catalogsCache = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -219,6 +221,16 @@ public void testHiveKeyedTableMajorOptimizeAndMove() throws TException, IOExcept
testCase.testHiveKeyedTableMajorOptimizeAndMove();
}

@Test
public void testHiveUnKeyedTableFullOptimizeCopyingFiles() throws TException, IOException {
createHiveArcticTable(TB_15, PrimaryKeySpec.noPrimaryKey(), PartitionSpec.unpartitioned());
assertTableExist(TB_15);
UnkeyedTable table = catalog(HIVE_CATALOG).loadTable(TB_15).asUnkeyedTable();
MixedHiveOptimizingTest testCase =
new MixedHiveOptimizingTest(table, TEST_HMS.getHiveClient(), getOptimizeHistoryStartId());
testCase.testHiveUnKeyedTableFullOptimizeCopyingFiles();
}

private static long getOptimizeHistoryStartId() {
return ServiceContainer.getOptimizeService().maxOptimizeHistoryId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableIdentifier;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -120,6 +122,30 @@ public void testHiveKeyedTableMajorOptimizeAndMove() throws TException, IOExcept
assertOptimizeHangUp(tb, startId + offset);
}

public void testHiveUnKeyedTableFullOptimizeCopyingFiles() throws TException, IOException {
int offset = 1;
UnkeyedTable table = arcticTable.asUnkeyedTable();
TableIdentifier tb = table.id();
// write 2 small files and 1 large file
updateProperties(arcticTable, TableProperties.ENABLE_SELF_OPTIMIZING, false + "");
updateProperties(arcticTable, TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, 1000 + "");
writeBase(table, rangeFromTo(1, 2, "aaa", quickDateWithZone(3)));
writeBase(table, rangeFromTo(3, 4, "aaa", quickDateWithZone(3)));
List<DataFile> largeFile = writeBase(table, rangeFromTo(5, 5000, "aaa", quickDateWithZone(3)));
long targetFileSize = (largeFile.get(0).fileSizeInBytes() - 10) * 10 / 9;
updateProperties(arcticTable, TableProperties.SELF_OPTIMIZING_TARGET_SIZE, targetFileSize + "");

// wait Full Optimize result
updateProperties(arcticTable, TableProperties.ENABLE_SELF_OPTIMIZING, true + "");
OptimizeHistory optimizeHistory = waitOptimizeResult(tb, startId + offset++);
assertOptimizeHistory(optimizeHistory, OptimizeType.FullMajor, 3, 2);
assertIdRange(readRecords(table), 1, 5000);
// assert file are in hive location
assertIdRange(readHiveTableData(), 1, 5000);

assertOptimizeHangUp(tb, startId + offset);
}

private Record newRecord(Object... val) {
return newRecord(arcticTable.schema(), val);
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/com/netease/arctic/io/ArcticFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public interface ArcticFileIO extends FileIO, Configurable {
*/
void rename(String oldPath, String newPath);

/**
* Copy file from old path to new path
* @param oldPath - source path
* @param newPath - target path
*/
void copy(String oldPath, String newPath);

/** Delete a directory recursively
*
* @param path the path to delete.
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/java/com/netease/arctic/io/ArcticHadoopFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.netease.arctic.table.TableMetaStore;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
Expand Down Expand Up @@ -179,6 +180,24 @@ public void rename(String src, String dts) {
});
}

@Override
public void copy(String src, String dts) {
tableMetaStore.doAs(() -> {
Path srcPath = new Path(src);
Path dtsPath = new Path(dts);
FileSystem fs = getFs(srcPath);
if (fs.isDirectory(srcPath)) {
throw new IllegalArgumentException("can't copy directory");
}
try {
FileUtil.copy(fs, srcPath, fs, dtsPath, false, true, conf());
} catch (IOException e) {
throw new UncheckedIOException("Fail to copy: from " + src + " to " + dts, e);
}
return null;
});
}

@Override
public <T> T doAs(Callable<T> callable) {
return tableMetaStore.doAs(callable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public void rename(String oldPath, String newPath) {
fileIO.rename(oldPath, newPath);
}

@Override
public void copy(String oldPath, String newPath) {
fileIO.copy(oldPath, newPath);
}

@Override
public void deleteDirectoryRecursively(String path) {
//Do not move trash when deleting directory as it is used for dropping table only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ private NodeTask constructTask(ArcticTable table, OptimizeTask task, int attempt

String customHiveSubdirectory = properties.get(OptimizeTaskProperties.CUSTOM_HIVE_SUB_DIRECTORY);
nodeTask.setCustomHiveSubdirectory(customHiveSubdirectory);
nodeTask.setCopyFiles(
PropertyUtil.propertyAsBoolean(properties, OptimizeTaskProperties.COPY_FILES_TO_NEW_LOCATION, false));

Long maxExecuteTime = PropertyUtil.propertyAsLong(properties,
OptimizeTaskProperties.MAX_EXECUTE_TIME, TableProperties.SELF_OPTIMIZING_EXECUTE_TIMEOUT_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.netease.arctic.utils.map.StructLikeCollections;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -129,4 +130,37 @@ protected void checkIfTimeout(Closeable writer) throws Exception {
actualExecuteTime, task.getMaxExecuteTime(), factor));
}
}

/**
* Estimates a larger max target file size than the target size to avoid creating tiny remainder
* files.
*
* <p>While we create tasks that should all be smaller than our target size, there is a chance
* that the actual data will end up being larger than our target size due to various factors of
* compression, serialization, which are outside our control. If this occurs, instead of making a
* single file that is close in size to our target, we would end up producing one file of the
* target size, and then a small extra file with the remaining data.
*
* <p>For example, if our target is 128 MB, we may generate a rewrite task that should be 120 MB.
* When we write the data we may find we actually have to write out 138 MB. If we use the target
* size while writing, we would produce a 128 MB file and an 10 MB file. If instead we use a
* larger size estimated by this method, then we end up writing a single file.
*/
protected long targetFileSize(long inputSize) {
long targetFileSize = PropertyUtil.propertyAsLong(table.properties(),
TableProperties.SELF_OPTIMIZING_TARGET_SIZE,
TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT);
long maxTargetFileSize = getMaxTargetFileSize(targetFileSize);
if (inputSize <= maxTargetFileSize) {
return maxTargetFileSize;
} else {
return targetFileSize;
}
}

private long getMaxTargetFileSize(long targetFileSize) {
int fragmentRatio = PropertyUtil.propertyAsInt(table.properties(), TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO,
TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT);
return targetFileSize + targetFileSize / fragmentRatio;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -125,6 +124,10 @@ private List<? extends ContentFile<?>> optimizeDeleteFiles() throws Exception {
return icebergPosDeleteWriter.complete();
}

protected long inputSize() {
return task.allIcebergDataFiles().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
}

private CombinedIcebergScanTask buildIcebergScanTask() {
return new CombinedIcebergScanTask(task.allIcebergDataFiles().toArray(new DataFileWithSequence[0]),
task.allIcebergDeleteFiles().toArray(new DeleteFileWithSequence[0]),
Expand All @@ -138,9 +141,7 @@ private List<? extends ContentFile<?>> optimizeDataFiles() throws Exception {
false, IdentityPartitionConverters::convertConstant, false, structLikeCollections);

String formatAsString = table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
long targetSizeByBytes = PropertyUtil.propertyAsLong(table.properties(),
com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE,
com.netease.arctic.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT);
long targetSizeByBytes = targetFileSize(inputSize());

OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table.asUnkeyedTable(), table.spec().specId(),
task.getAttemptId()).build();
Expand Down