Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5434] Fix archival in metadata table to not rely on completed rollback or clean in data table #7580

Merged
merged 3 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
Expand All @@ -46,6 +46,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
Expand Down Expand Up @@ -514,24 +515,27 @@ private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
.setConf(metaClient.getHadoopConf())
.build();
Option<HoodieInstant> earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant();

if (config.shouldArchiveBeyondSavepoint()) {
yihua marked this conversation as resolved.
Show resolved Hide resolved
// There are chances that there could be holes in the timeline due to archival and savepoint interplay.
// So, the first non-savepoint commit in the data timeline is considered as beginning of the active timeline.
Option<HoodieInstant> firstNonSavepointCommit = dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit();
if (firstNonSavepointCommit.isPresent()) {
String firstNonSavepointCommitTime = firstNonSavepointCommit.get().getTimestamp();
instants = instants.filter(instant ->
compareTimestamps(instant.getTimestamp(), LESSER_THAN, firstNonSavepointCommitTime));
}
} else {
// Do not archive the commits that live in data set active timeline.
// This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata for details.
if (earliestActiveDatasetCommit.isPresent()) {
instants = instants.filter(instant ->
compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp()));
}
Option<HoodieInstant> qualifiedEarliestInstant =
TimelineUtils.getEarliestInstantForMetadataArchival(
dataMetaClient.getActiveTimeline(), config.shouldArchiveBeyondSavepoint());

// Do not archive the instants after the earliest commit (COMMIT, DELTA_COMMIT, and
// REPLACE_COMMIT only, considering non-savepoint commit only if enabling archive
// beyond savepoint) and the earliest inflight instant (all actions).
// This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata
// for details.
// Note that we cannot blindly use the earliest instant of all actions, because CLEAN and
// ROLLBACK instants are archived separately apart from commits (check
// HoodieTimelineArchiver#getCleanInstantsToArchive). If we do so, a very old completed
// CLEAN or ROLLBACK instant can block the archive of metadata table timeline and causes
// the active timeline of metadata table to be extremely long, leading to performance issues
// for loading the timeline.
if (qualifiedEarliestInstant.isPresent()) {
instants = instants.filter(instant ->
compareTimestamps(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created a jira to test savepoint beyond archival. I guess, we should not be deleting the commits that are savepointed.
https://issues.apache.org/jira/browse/HUDI-5525

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg. This particular logic is for the metadata table's archival only. Nevertheless, we should still thoroughly test the archival beyond savepoints, orthogonal to this PR.

instant.getTimestamp(),
HoodieTimeline.LESSER_THAN,
qualifiedEarliestInstant.get().getTimestamp()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,8 +1301,15 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception {
.setLoadActiveTimelineOnLoad(true).build();

for (int i = 1; i <= 17; i++) {
testTable.doWriteOperation("000000" + String.format("%02d", i), WriteOperationType.UPSERT,
i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
if (i != 2) {
testTable.doWriteOperation("000000" + String.format("%02d", i), WriteOperationType.UPSERT,
i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
} else {
// For i == 2, roll back the first commit "00000001", so the active timeline of the
// data table has one rollback instant
// The completed rollback should not block the archival in the metadata table
testTable.doRollback("00000001", "00000002");
}
// archival
archiveAndGetCommitsList(writeConfig);

Expand All @@ -1323,10 +1330,9 @@ public void testArchivalAndCompactionInMetadataTable() throws Exception {
} else if (i == 8) {
// i == 8
// The instant "00000000000000" was archived since it's less than
// the earliest instant on the dataset active timeline,
// the dataset active timeline has instants of range [00000001 ~ 00000008]
// because when it does the archiving, no compaction instant on the
// metadata active timeline exists yet.
// the earliest commit on the dataset active timeline,
// the dataset active timeline has instants:
// 00000002.rollback, 00000007.commit, 00000008.commit
assertEquals(9, metadataTableInstants.size());
assertTrue(metadataTableInstants.contains(
new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000007001")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;

Expand All @@ -39,6 +40,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;

/**
* TimelineUtils provides a common way to query incremental meta-data changes for a hoodie table.
*
Expand Down Expand Up @@ -87,15 +93,15 @@ public static List<String> getDroppedPartitions(HoodieTimeline timeline) {
public static List<String> getAffectedPartitions(HoodieTimeline timeline) {
return timeline.filterCompletedInstants().getInstantsAsStream().flatMap(s -> {
switch (s.getAction()) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION:
case COMMIT_ACTION:
case DELTA_COMMIT_ACTION:
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
} catch (IOException e) {
throw new HoodieIOException("Failed to get partitions written at " + s, e);
}
case HoodieTimeline.REPLACE_COMMIT_ACTION:
}
case REPLACE_COMMIT_ACTION:
try {
HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes(
timeline.getInstantDetails(s).get(), HoodieReplaceCommitMetadata.class);
Expand Down Expand Up @@ -148,11 +154,11 @@ public static List<String> getAffectedPartitions(HoodieTimeline timeline) {
* Get extra metadata for specified key from latest commit/deltacommit/replacecommit(eg. insert_overwrite) instant.
*/
public static Option<String> getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) {
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
// exclude clustering commits for returning user stored extra metadata
.filter(instant -> !isClusteringCommit(metaClient, instant))
.findFirst().map(instant ->
getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
}

/**
Expand All @@ -170,7 +176,7 @@ public static Option<String> getExtraMetadataFromLatestIncludeClustering(HoodieT
*/
public static Map<String, Option<String>> getAllExtraMetadataForKey(HoodieTableMetaClient metaClient, String extraMetadataKey) {
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toMap(
HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient, extraMetadataKey, instant)));
HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient, extraMetadataKey, instant)));
}

private static Option<String> getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) {
Expand All @@ -184,10 +190,10 @@ private static Option<String> getMetadataValue(HoodieTableMetaClient metaClient,
throw new HoodieIOException("Unable to parse instant metadata " + instant, e);
}
}

public static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
if (REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
// replacecommit is used for multiple operations: insert_overwrite/cluster etc.
// Check operation type to see if this instant is related to clustering.
HoodieReplaceCommitMetadata replaceMetadata = HoodieReplaceCommitMetadata.fromBytes(
Expand Down Expand Up @@ -240,10 +246,53 @@ public static HoodieCommitMetadata getCommitMetadata(
HoodieInstant instant,
HoodieTimeline timeline) throws IOException {
byte[] data = timeline.getInstantDetails(instant).get();
if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
if (instant.getAction().equals(REPLACE_COMMIT_ACTION)) {
return HoodieReplaceCommitMetadata.fromBytes(data, HoodieReplaceCommitMetadata.class);
} else {
return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
}
}

/**
* Gets the qualified earliest instant from the active timeline of the data table
* for the archival in metadata table.
* <p>
* the qualified earliest instant is chosen as the earlier one between the earliest
* commit (COMMIT, DELTA_COMMIT, and REPLACE_COMMIT only, considering non-savepoint
* commit only if enabling archive beyond savepoint) and the earliest inflight
* instant (all actions).
*
* @param dataTableActiveTimeline the active timeline of the data table.
* @param shouldArchiveBeyondSavepoint whether to archive beyond savepoint.
* @return the instant meeting the requirement.
*/
public static Option<HoodieInstant> getEarliestInstantForMetadataArchival(
HoodieActiveTimeline dataTableActiveTimeline, boolean shouldArchiveBeyondSavepoint) {
// This is for commits only, not including CLEAN, ROLLBACK, etc.
// When archive beyond savepoint is enabled, there are chances that there could be holes
// in the timeline due to archival and savepoint interplay. So, the first non-savepoint
// commit in the data timeline is considered as beginning of the active timeline.
Option<HoodieInstant> earliestCommit = shouldArchiveBeyondSavepoint
? dataTableActiveTimeline.getTimelineOfActions(
CollectionUtils.createSet(
COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION, SAVEPOINT_ACTION))
.getFirstNonSavepointCommit()
: dataTableActiveTimeline.getCommitsTimeline().firstInstant();
// This is for all instants which are in-flight
Option<HoodieInstant> earliestInflight =
dataTableActiveTimeline.filterInflightsAndRequested().firstInstant();

if (earliestCommit.isPresent() && earliestInflight.isPresent()) {
if (earliestCommit.get().compareTo(earliestInflight.get()) < 0) {
return earliestCommit;
}
return earliestInflight;
} else if (earliestCommit.isPresent()) {
return earliestCommit;
} else if (earliestInflight.isPresent()) {
return earliestInflight;
} else {
return Option.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,13 @@
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -136,7 +141,7 @@ public void testGetPartitions() throws IOException {
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));

HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts);
activeTimeline.createNewInstant(cleanInstant);
activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(olderPartition, ts));
}
Expand Down Expand Up @@ -175,7 +180,7 @@ public void testGetPartitionsUnPartitioned() throws IOException {
activeTimeline.createNewInstant(instant);
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2, Collections.emptyMap())));

HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
HoodieInstant cleanInstant = new HoodieInstant(true, CLEAN_ACTION, ts);
activeTimeline.createNewInstant(cleanInstant);
activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(partitionPath, ts));
}
Expand Down Expand Up @@ -339,6 +344,81 @@ public void verifyTimeline(List<HoodieInstant> expectedInstants, HoodieTimeline
);
}

@Test
public void testGetEarliestInstantForMetadataArchival() throws IOException {
// Empty timeline
assertEquals(
Option.empty(),
TimelineUtils.getEarliestInstantForMetadataArchival(
prepareActiveTimeline(new ArrayList<>()), false));

// Earlier request clean action before commits
assertEquals(
Option.of(new HoodieInstant(REQUESTED, CLEAN_ACTION, "003")),
TimelineUtils.getEarliestInstantForMetadataArchival(
prepareActiveTimeline(
Arrays.asList(
new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"),
new HoodieInstant(COMPLETED, CLEAN_ACTION, "002"),
new HoodieInstant(REQUESTED, CLEAN_ACTION, "003"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
new HoodieInstant(COMPLETED, REPLACE_COMMIT_ACTION, "011"))), false));

// No inflight instants
assertEquals(
Option.of(new HoodieInstant(COMPLETED, COMMIT_ACTION, "010")),
TimelineUtils.getEarliestInstantForMetadataArchival(
prepareActiveTimeline(
Arrays.asList(
new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"),
new HoodieInstant(COMPLETED, CLEAN_ACTION, "002"),
new HoodieInstant(COMPLETED, CLEAN_ACTION, "003"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
new HoodieInstant(COMPLETED, REPLACE_COMMIT_ACTION, "011"))), false));

// Rollbacks only
assertEquals(
Option.of(new HoodieInstant(INFLIGHT, ROLLBACK_ACTION, "003")),
TimelineUtils.getEarliestInstantForMetadataArchival(
prepareActiveTimeline(
Arrays.asList(
new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"),
new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "002"),
new HoodieInstant(INFLIGHT, ROLLBACK_ACTION, "003"))), false));

assertEquals(
Option.empty(),
TimelineUtils.getEarliestInstantForMetadataArchival(
prepareActiveTimeline(
Arrays.asList(
new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"),
new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "002"),
new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "003"))), false));

// With savepoints
HoodieActiveTimeline timeline = prepareActiveTimeline(
Arrays.asList(
new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "001"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "003"),
new HoodieInstant(COMPLETED, SAVEPOINT_ACTION, "003"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "011")));
assertEquals(
Option.of(new HoodieInstant(COMPLETED, COMMIT_ACTION, "003")),
TimelineUtils.getEarliestInstantForMetadataArchival(timeline, false));
assertEquals(
Option.of(new HoodieInstant(COMPLETED, COMMIT_ACTION, "010")),
TimelineUtils.getEarliestInstantForMetadataArchival(timeline, true));
}

private HoodieActiveTimeline prepareActiveTimeline(
List<HoodieInstant> activeInstants) throws IOException {
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
when(mockMetaClient.scanHoodieInstantsFromFileSystem(any(), eq(true)))
.thenReturn(activeInstants);
return new HoodieActiveTimeline(mockMetaClient);
}

private void verifyExtraMetadataLatestValue(String extraMetadataKey, String expected, boolean includeClustering) {
final Option<String> extraLatestValue;
if (includeClustering) {
Expand Down