Skip to content
Permalink
Browse files
[HUDI-2833][Design] Merge small archive files instead of expanding in…
…definitely. (#4078)

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
  • Loading branch information
zhangyue19921010 and yuezhang committed Jan 19, 2022
1 parent 4bea758 commit 7647562dad9e0615273bd76f75e7280f5ae7b7ce
Show file tree
Hide file tree
Showing 8 changed files with 639 additions and 46 deletions.
@@ -255,6 +255,22 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "record size estimate compute dynamically based on commit metadata. "
+ " This is critical in computing the insert parallelism and bin-packing inserts into small files.");

public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
.key("hoodie.archive.merge.files.batch.size")
.defaultValue(10)
.withDocumentation("The number of small archive files to be merged at once.");

public static final ConfigProperty<Long> ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
.key("hoodie.archive.merge.small.file.limit.bytes")
.defaultValue(20L * 1024 * 1024)
.withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.");

public static final ConfigProperty<Boolean> ARCHIVE_MERGE_ENABLE = ConfigProperty
.key("hoodie.archive.merge.enable")
.defaultValue(false)
.withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
+ " useful when storage scheme doesn't support append operation.");

/** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
@Deprecated
public static final String CLEANER_POLICY_PROP = CLEANER_POLICY.key();
@@ -547,6 +563,21 @@ public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
return this;
}

public Builder withArchiveMergeFilesBatchSize(int number) {
compactionConfig.setValue(ARCHIVE_MERGE_FILES_BATCH_SIZE, String.valueOf(number));
return this;
}

public Builder withArchiveMergeSmallFileLimit(long size) {
compactionConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size));
return this;
}

public Builder withArchiveMergeEnable(boolean enable) {
compactionConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable));
return this;
}

public Builder compactionSmallFileSize(long smallFileLimitBytes) {
compactionConfig.setValue(PARQUET_SMALL_FILE_LIMIT, String.valueOf(smallFileLimitBytes));
return this;
@@ -1082,6 +1082,10 @@ public int getMinCommitsToKeep() {
return getInt(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP);
}

public int getArchiveMergeFilesBatchSize() {
return getInt(HoodieCompactionConfig.ARCHIVE_MERGE_FILES_BATCH_SIZE);
}

public int getParquetSmallFileLimit() {
return getInt(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT);
}
@@ -1110,6 +1114,14 @@ public boolean isAutoClean() {
return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
}

public boolean getArchiveMergeEnable() {
return getBoolean(HoodieCompactionConfig.ARCHIVE_MERGE_ENABLE);
}

public long getArchiveMergeSmallFileLimitBytes() {
return getLong(HoodieCompactionConfig.ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES);
}

public boolean isAutoArchive() {
return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE);
}
@@ -18,14 +18,19 @@

package org.apache.hudi.table;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -36,8 +41,10 @@
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -57,6 +64,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -106,6 +114,19 @@ private Writer openWriter() {
}
}

public Writer reOpenWriter() {
try {
if (this.writer != null) {
this.writer.close();
this.writer = null;
}
this.writer = openWriter();
return writer;
} catch (IOException e) {
throw new HoodieException("Unable to initialize HoodieLogFormat writer", e);
}
}

private void close() {
try {
if (this.writer != null) {
@@ -122,7 +143,7 @@ private void close() {
public boolean archiveIfRequired(HoodieEngineContext context) throws IOException {
try {
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());

verifyLastMergeArchiveFilesIfNecessary(context);
boolean success = true;
if (!instantsToArchive.isEmpty()) {
this.writer = openWriter();
@@ -134,12 +155,212 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException
LOG.info("No Instants to archive");
}

if (shouldMergeSmallArchiveFies()) {
mergeArchiveFilesIfNecessary(context);
}
return success;
} finally {
close();
}
}

public boolean shouldMergeSmallArchiveFies() {
return config.getArchiveMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme());
}

/**
* Here Hoodie can merge the small archive files into a new larger one.
* Only used for filesystem which does not support append operation.
* The whole merge small archive files operation has four stages:
* 1. Build merge plan with merge candidates/merged file name infos.
* 2. Do merge.
* 3. Delete all the candidates.
* 4. Delete the merge plan.
* @param context HoodieEngineContext
* @throws IOException
*/
private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException {
Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
// Flush remained content if existed and open a new write
reOpenWriter();
// List all archive files
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
// Sort files by version suffix in reverse (implies reverse chronological order)
Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator());

int archiveMergeFilesBatchSize = config.getArchiveMergeFilesBatchSize();
long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();

List<FileStatus> mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses);

if (mergeCandidate.size() >= archiveMergeFilesBatchSize) {
List<String> candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList());
// before merge archive files build merge plan
String logFileName = computeLogFileName();
buildArchiveMergePlan(candidateFiles, planPath, logFileName);
// merge archive files
mergeArchiveFiles(mergeCandidate);
// after merge, delete the small archive files.
deleteFilesParallelize(metaClient, candidateFiles, context, true);
LOG.info("Success to delete replaced small archive files.");
// finally, delete archiveMergePlan which means merging small archive files operation is succeed.
metaClient.getFs().delete(planPath, false);
LOG.info("Success to merge small archive files.");
}
}

/**
* Find the latest 'huge archive file' index as a break point and only check/merge newer archive files.
* Because we need to keep the original order of archive files which is important when loading archived instants with time filter.
* {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter)
* @param smallFileLimitBytes small File Limit Bytes
* @param fsStatuses Sort by version suffix in reverse
* @return merge candidates
*/
private List<FileStatus> getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) {
int index = 0;
for (; index < fsStatuses.length; index++) {
if (fsStatuses[index].getLen() > smallFileLimitBytes) {
break;
}
}
return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
}

/**
* Get final written archive file name based on storageSchemes which does not support append.
*/
private String computeLogFileName() throws IOException {
String logWriteToken = writer.getLogFile().getLogWriteToken();
HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
return hoodieLogFile.getFileName();
}

/**
* Check/Solve if there is any failed and unfinished merge small archive files operation
* @param context HoodieEngineContext used for parallelize to delete small archive files if necessary.
* @throws IOException
*/
private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException {
if (shouldMergeSmallArchiveFies()) {
Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
HoodieWrapperFileSystem fs = metaClient.getFs();
// If plan exist, last merge small archive files was failed.
// we need to revert or complete last action.
if (fs.exists(planPath)) {
HoodieMergeArchiveFilePlan plan = null;
try {
plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fs, planPath).get(), HoodieMergeArchiveFilePlan.class);
} catch (IOException e) {
LOG.warn("Parsing merge archive plan failed.", e);
// Reading partial plan file which means last merge action is failed during writing plan file.
fs.delete(planPath);
return;
}
Path mergedArchiveFile = new Path(metaClient.getArchivePath(), plan.getMergedArchiveFileName());
List<Path> candidates = plan.getCandidate().stream().map(Path::new).collect(Collectors.toList());
if (candidateAllExists(candidates)) {
// Last merge action is failed during writing merged archive file.
// But all the small archive files are not deleted.
// Revert last action by deleting mergedArchiveFile if existed.
if (fs.exists(mergedArchiveFile)) {
fs.delete(mergedArchiveFile, false);
}
} else {
// Last merge action is failed during deleting small archive files.
// But the merged files is completed.
// Try to complete last action
if (fs.exists(mergedArchiveFile)) {
deleteFilesParallelize(metaClient, plan.getCandidate(), context, true);
}
}

fs.delete(planPath);
}
}
}

/**
* If all the candidate small archive files existed, last merge operation was failed during writing the merged archive file.
* If at least one of candidate small archive files existed, the merged archive file was created and last operation was failed during deleting the small archive files.
*/
private boolean candidateAllExists(List<Path> candidates) throws IOException {
for (Path archiveFile : candidates) {
if (!metaClient.getFs().exists(archiveFile)) {
// candidate is deleted
return false;
}
}
return true;
}

public void buildArchiveMergePlan(List<String> compactCandidate, Path planPath, String compactedArchiveFileName) throws IOException {
LOG.info("Start to build archive merge plan.");
HoodieMergeArchiveFilePlan plan = HoodieMergeArchiveFilePlan.newBuilder()
.setCandidate(compactCandidate)
.setMergedArchiveFileName(compactedArchiveFileName)
.build();
Option<byte[]> content = TimelineMetadataUtils.serializeAvroMetadata(plan, HoodieMergeArchiveFilePlan.class);
// building merge archive files plan.
FileIOUtils.createFileInPath(metaClient.getFs(), planPath, content);
LOG.info("Success to build archive merge plan");
}

public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws IOException {
LOG.info("Starting to merge small archive files.");
Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
try {
List<IndexedRecord> records = new ArrayList<>();
for (FileStatus fs : compactCandidate) {
// Read the archived file
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> recordsPerFile = blk.getRecords();
records.addAll(recordsPerFile);
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
}
}
}
writeToFile(wrapperSchema, records);
} catch (Exception e) {
throw new HoodieCommitException("Failed to merge small archive files", e);
} finally {
writer.close();
}
LOG.info("Success to merge small archive files.");
}

private Map<String, Boolean> deleteFilesParallelize(HoodieTableMetaClient metaClient, List<String> paths, HoodieEngineContext context, boolean ignoreFailed) {

return FSUtils.parallelizeFilesProcess(context,
metaClient.getFs(),
config.getArchiveDeleteParallelism(),
pairOfSubPathAndConf -> {
Path file = new Path(pairOfSubPathAndConf.getKey());
try {
FileSystem fs = metaClient.getFs();
if (fs.exists(file)) {
return fs.delete(file, false);
}
return true;
} catch (IOException e) {
if (!ignoreFailed) {
throw new HoodieIOException("Failed to delete : " + file, e);
} else {
LOG.warn("Ignore failed deleting : " + file);
return true;
}
}
},
paths);
}

private Stream<HoodieInstant> getCleanInstantsToArchive() {
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants();
@@ -238,22 +459,7 @@ private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, Hoo
).map(Path::toString).collect(Collectors.toList());

context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants");
Map<String, Boolean> resultDeleteInstantFiles = FSUtils.parallelizeFilesProcess(context,
metaClient.getFs(),
config.getArchiveDeleteParallelism(),
pairOfSubPathAndConf -> {
Path commitFile = new Path(pairOfSubPathAndConf.getKey());
try {
FileSystem fs = commitFile.getFileSystem(pairOfSubPathAndConf.getValue().get());
if (fs.exists(commitFile)) {
return fs.delete(commitFile, false);
}
return true;
} catch (IOException e) {
throw new HoodieIOException("Failed to delete archived instant " + commitFile, e);
}
},
instantFiles);
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false);

for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());

0 comments on commit 7647562

Please sign in to comment.