Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMet
}

protected List<String> getAllExistingFileIds(String partitionPath) {
// because new commit is not complete. it is safe to mark all existing file Ids as old files
return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
// we should only fetch the latest merged file slices with committed data
return table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionPath, instantTime).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes here are intended to achieve the following two points:

  1. only the view completed before this instant should be seen to ensure transactionality.
  2. we should select the file id of the file slice where the valid commit data actually exists as the replaced file id.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.hudi.common.model;

import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.CompletionTimeQueryView;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -119,6 +121,10 @@ public void addLogFile(CompletionTimeQueryView completionTimeQueryView, HoodieLo
@VisibleForTesting
public String getBaseInstantTime(CompletionTimeQueryView completionTimeQueryView, HoodieLogFile logFile) {
if (fileSlices.isEmpty()) {
if (completionTimeQueryView.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT) && !completionTimeQueryView.isCompleted(logFile.getDeltaCommitTime())) {
// no base file in the file group and the smallest log file is still pending, we should use the INIT_INSTANT_TS for these uncertain cases.
return HoodieActiveTimeline.INIT_INSTANT_TS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a version compatibility issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a version compatibility issue?

Indeed, it is. Essentially, it is because the log file name of the old version contained the base instant time of this slice, while the log file name of the new version only contains the instant written to this file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update:
When only the log with the smallest instant has not been completed yet, we adopt INIT_INSTANT_TS as the base instant because at this time we have no appropriate way to select a reasonable base instant.

}
// no base file in the file group, use the log file delta commit time.
return logFile.getDeltaCommitTime();
}
Expand All @@ -130,7 +136,6 @@ public String getBaseInstantTime(CompletionTimeQueryView completionTimeQueryView
return commitTime;
}
}
// no base file that starts earlier than the log delta commit completion time,
// use the log file delta commit time.
return logFile.getDeltaCommitTime();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,39 @@

package org.apache.hudi.common.table.timeline;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;

import java.io.Serializable;
import java.util.List;
import java.util.function.Function;

public interface CompletionTimeQueryView extends AutoCloseable {
public abstract class CompletionTimeQueryView implements AutoCloseable, Serializable {

boolean isCompleted(String beginInstantTime);
protected HoodieTableMetaClient metaClient;

public CompletionTimeQueryView(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
}

public abstract boolean isCompleted(String beginInstantTime);

/**
* Returns whether the instant is archived.
*/
boolean isArchived(String instantTime);
public abstract boolean isArchived(String instantTime);

/**
* Returns whether the give instant time {@code instantTime} completed before the base instant {@code baseInstant}.
*/
boolean isCompletedBefore(String baseInstant, String instantTime);
public abstract boolean isCompletedBefore(String baseInstant, String instantTime);

/**
* Returns whether the given instant time {@code instantTime} is sliced after or on the base instant {@code baseInstant}.
*/
boolean isSlicedAfterOrOn(String baseInstant, String instantTime);
public abstract boolean isSlicedAfterOrOn(String baseInstant, String instantTime);

/**
* Get completion time with a base instant time as a reference to fix the compatibility.
Expand All @@ -51,7 +60,7 @@ public interface CompletionTimeQueryView extends AutoCloseable {
*
* @return Probability fixed completion time.
*/
Option<String> getCompletionTime(String baseInstant, String instantTime);
public abstract Option<String> getCompletionTime(String baseInstant, String instantTime);

/**
* Queries the completion time with given instant time.
Expand All @@ -60,7 +69,7 @@ public interface CompletionTimeQueryView extends AutoCloseable {
*
* @return The completion time if the instant finished or empty if it is still pending.
*/
Option<String> getCompletionTime(String beginTime);
public abstract Option<String> getCompletionTime(String beginTime);

/**
* Queries the instant times with given completion time range.
Expand All @@ -74,7 +83,7 @@ public interface CompletionTimeQueryView extends AutoCloseable {
*
* @return The sorted instant time list.
*/
List<String> getInstantTimes(
public abstract List<String> getInstantTimes(
HoodieTimeline timeline,
Option<String> startCompletionTime,
Option<String> endCompletionTime,
Expand All @@ -90,7 +99,7 @@ List<String> getInstantTimes(
*
* @return The sorted instant time list.
*/
List<String> getInstantTimes(
public abstract List<String> getInstantTimes(
String startCompletionTime,
String endCompletionTime,
Function<String, String> earliestInstantTimeFunc);
Expand All @@ -99,11 +108,15 @@ List<String> getInstantTimes(
* Get Cursor Instant
* @return
*/
String getCursorInstant();
public abstract String getCursorInstant();

/**
* Return true if the table is empty.
* @return
*/
boolean isEmptyTable();
public abstract boolean isEmptyTable();

public HoodieTableVersion getTableVersion() {
return metaClient.getTableConfig().getTableVersion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;

import java.io.Serializable;
import java.time.Instant;
import java.util.Date;
import java.util.List;
Expand All @@ -39,15 +38,13 @@
import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;

public class CompletionTimeQueryViewV1 implements CompletionTimeQueryView, Serializable {
public class CompletionTimeQueryViewV1 extends CompletionTimeQueryView {
private static final long serialVersionUID = 1L;

private static final long MILLI_SECONDS_IN_THREE_DAYS = 3 * 24 * 3600 * 1000;

private static final long MILLI_SECONDS_IN_ONE_DAY = 24 * 3600 * 1000;

private final HoodieTableMetaClient metaClient;

/**
* Mapping from instant start time -> completion time.
* Should be thread-safe data structure.
Expand Down Expand Up @@ -83,7 +80,7 @@ public CompletionTimeQueryViewV1(HoodieTableMetaClient metaClient) {
* @param eagerLoadInstant The earliest instant time to eagerly load from, by default load last N days of completed instants.
*/
public CompletionTimeQueryViewV1(HoodieTableMetaClient metaClient, String eagerLoadInstant) {
this.metaClient = metaClient;
super(metaClient);
this.beginToCompletionInstantTimeMap = new ConcurrentHashMap<>();
this.cursorInstant = InstantComparison.minInstant(eagerLoadInstant, metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::requestedTime).orElse(""));
// Note: use getWriteTimeline() to keep sync with the fs view visibleCommitsAndCompactionTimeline, see AbstractTableFileSystemView.refreshTimeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import org.apache.avro.generic.GenericRecord;

import java.io.Serializable;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -53,7 +52,7 @@
/**
* Query view for instant completion time.
*/
public class CompletionTimeQueryViewV2 implements CompletionTimeQueryView, Serializable {
public class CompletionTimeQueryViewV2 extends CompletionTimeQueryView {
private static final long serialVersionUID = 1L;

private static final long MILLI_SECONDS_IN_THREE_DAYS = 3 * 24 * 3600 * 1000;
Expand All @@ -63,8 +62,6 @@ public class CompletionTimeQueryViewV2 implements CompletionTimeQueryView, Seria
private static final Function<String, String> GET_INSTANT_ONE_DAY_BEFORE = instant ->
HoodieInstantTimeGenerator.instantTimeMinusMillis(instant, MILLI_SECONDS_IN_ONE_DAY);

private final HoodieTableMetaClient metaClient;

/**
* Mapping from instant time -> completion time.
* Should be thread-safe data structure.
Expand Down Expand Up @@ -100,7 +97,7 @@ public CompletionTimeQueryViewV2(HoodieTableMetaClient metaClient) {
* @param eagerLoadInstant The earliest instant time to eagerly load from, by default load last N days of completed instants.
*/
public CompletionTimeQueryViewV2(HoodieTableMetaClient metaClient, String eagerLoadInstant) {
this.metaClient = metaClient;
super(metaClient);
this.instantTimeToCompletionTimeMap = new ConcurrentHashMap<>();
this.cursorInstant = InstantComparison.minInstant(eagerLoadInstant, metaClient.getActiveTimeline().firstInstant().map(HoodieInstant::requestedTime).orElse(""));
// Note: use getWriteTimeline() to keep sync with the fs view visibleCommitsAndCompactionTimeline, see AbstractTableFileSystemView.refreshTimeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,27 @@ private Stream<FileSlice> filterUncommittedFiles(FileSlice fileSlice, boolean in
return Stream.of(fileSlice);
}

/**
* Ignores the empty file-slice.
* When a slice is full of pending log files, we need to filter out this situation because in fact, the file group represented by this file slice has not committed any data either
* @param fileSlice File Slice
* @return Stream of FileSlice
*/
private Stream<FileSlice> filterOutEmptyFileSlice(FileSlice fileSlice) {
// Check if the file slice is empty
// a. the base instant is a pending compaction, still return the file slice
// b. the base instant is a normal write, should filter out the file slice, because the entire file group is empty
if (fileSlice.isEmpty()) {
if (isFileSliceAfterPendingCompaction(fileSlice)) {
LOG.debug("File Slice ({}) is in pending compaction", fileSlice);
} else {
LOG.debug("File Slice ({}) is empty", fileSlice);
return Stream.of();
}
}
return Stream.of(fileSlice);
}

/**
* Ignores the uncommitted log files.
*
Expand Down Expand Up @@ -867,6 +888,10 @@ public final Stream<FileSlice> getLatestFileSlices(String partitionStr) {
.flatMap(slice -> tableVersion8AndAbove()
? this.filterUncommittedFiles(slice, true)
: this.filterBaseFileAfterPendingCompaction(slice, true))
.flatMap(slice -> tableVersion8AndAbove()
// for table version 8 and above, we need to filter out empty file slices
? this.filterOutEmptyFileSlice(slice)
: Stream.of(slice))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should filter out the isEmpty slice, because this slice is not isEmpty due to previous pending compaction, but because there are no valid commits at all in this slice

.map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
Expand Down Expand Up @@ -899,7 +924,11 @@ public final Stream<FileSlice> getLatestFileSlicesStateless(String partitionStr)
.filter(Option::isPresent).map(Option::get)
.flatMap(slice -> tableVersion8AndAbove()
? this.filterUncommittedFiles(slice, true)
: this.filterBaseFileAfterPendingCompaction(slice, true));
: this.filterBaseFileAfterPendingCompaction(slice, true))
.flatMap(slice -> tableVersion8AndAbove()
// for table version 8 and above, we need to filter out empty file slices
? this.filterOutEmptyFileSlice(slice)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

: Stream.of(slice));

if (bootstrapIndex.useIndex()) {
final Map<HoodieFileGroupId, BootstrapBaseFileMapping> bootstrapBaseFileMappings = getBootstrapBaseFileMappings(partition);
Expand Down Expand Up @@ -930,9 +959,13 @@ public final Option<FileSlice> getLatestFileSlice(String partitionStr, String fi
if (!fs.isPresent()) {
return Option.empty();
}
Stream<FileSlice> fileSlices = tableVersion8AndAbove()
Stream<FileSlice> fileSlices = (tableVersion8AndAbove()
? this.filterUncommittedFiles(fs.get(), true)
: this.filterBaseFileAfterPendingCompaction(fs.get(), true);
: this.filterBaseFileAfterPendingCompaction(fs.get(), true))
.flatMap(slice -> tableVersion8AndAbove()
// for table version 8 and above, we need to filter out empty file slices
? this.filterOutEmptyFileSlice(slice)
: Stream.of(slice));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above


return Option.ofNullable(fileSlices
.map(this::addBootstrapBaseFileIfPresent)
Expand Down Expand Up @@ -1051,6 +1084,9 @@ public final Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partit
fileSlice = Option.of(fetchMergedFileSlice(fileGroup, tableVersion8AndAbove()
? filterUncommittedLogs(fileSlice.get()) : fileSlice.get())
);
if (fileSlice.isPresent() && tableVersion8AndAbove()) {
fileSlice = Option.fromJavaOptional(filterOutEmptyFileSlice(fileSlice.get()).findAny());
}
}
return fileSlice;
}).filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -588,7 +589,10 @@ private static boolean shouldMergeFileSlice(FileSlice oldSlice, FileSlice newSli
// 2. the old file slice has no compaction scheduled, while the new file slice does,
// if they have different base instant time, should not merge;
// 3. the new file slice has compaction been scheduled, should not merge if old and new base instant time is different.
return isFileSliceWithoutCompactionBarrier(newSlice) || newSlice.getBaseInstantTime().equals(oldSlice.getBaseInstantTime());
// 4. the new file slice base log file still in pending, should merge.
return isFileSliceWithoutCompactionBarrier(newSlice)
|| newSlice.getBaseInstantTime().equals(oldSlice.getBaseInstantTime())
|| newSlice.getBaseInstantTime().equals(HoodieActiveTimeline.INIT_INSTANT_TS);
}

private static boolean isFileSliceWithoutCompactionBarrier(FileSlice fileSlice) {
Expand Down
Loading
Loading