Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2796] Metadata table support for Restore action to first commit #4039

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -339,21 +339,11 @@ private <T extends SpecificRecordBase> boolean isBootstrapNeeded(Option<HoodieIn
return false;
}

boolean isRollbackAction = false;
List<String> rollbackedTimestamps = Collections.emptyList();
if (actionMetadata.isPresent() && actionMetadata.get() instanceof HoodieRollbackMetadata) {
isRollbackAction = true;
List<HoodieInstantInfo> rollbackedInstants =
((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback();
rollbackedTimestamps = rollbackedInstants.stream().map(instant -> {
return instant.getCommitTime().toString();
}).collect(Collectors.toList());
}

// Detect the commit gaps if any from the data and the metadata active timeline
if (dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(
latestMetadataInstant.get().getTimestamp())
&& (!isRollbackAction || !rollbackedTimestamps.contains(latestMetadataInstantTimestamp))) {
LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
&& !isCommitRevertedByInFlightAction(actionMetadata, latestMetadataInstantTimestamp)) {
LOG.error("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived."
+ " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp()
+ ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
return true;
Expand All @@ -362,10 +352,59 @@ private <T extends SpecificRecordBase> boolean isBootstrapNeeded(Option<HoodieIn
return false;
}

/**
* Is the latest commit instant reverted by the in-flight instant action?
*
* @param actionMetadata - In-flight instant action metadata
* @param latestMetadataInstantTimestamp - Metadata table latest instant timestamp
* @param <T> - ActionMetadata type
* @return True if the latest instant action is reverted by the action
*/
private <T extends SpecificRecordBase> boolean isCommitRevertedByInFlightAction(Option<T> actionMetadata,
final String latestMetadataInstantTimestamp) {
if (!actionMetadata.isPresent()) {
return false;
}

final String INSTANT_ACTION = (actionMetadata.get() instanceof HoodieRollbackMetadata
? HoodieTimeline.ROLLBACK_ACTION
: (actionMetadata.get() instanceof HoodieRestoreMetadata ? HoodieTimeline.RESTORE_ACTION : ""));

List<String> affectedInstantTimestamps;
switch (INSTANT_ACTION) {
case HoodieTimeline.ROLLBACK_ACTION:
List<HoodieInstantInfo> rollbackedInstants =
((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback();
affectedInstantTimestamps = rollbackedInstants.stream().map(instant -> {
return instant.getCommitTime().toString();
}).collect(Collectors.toList());

if (affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) {
return true;
}
break;
case HoodieTimeline.RESTORE_ACTION:
List<HoodieInstantInfo> restoredInstants =
((HoodieRestoreMetadata) actionMetadata.get()).getRestoreInstantInfo();
affectedInstantTimestamps = restoredInstants.stream().map(instant -> {
return instant.getCommitTime().toString();
}).collect(Collectors.toList());

if (affectedInstantTimestamps.contains(latestMetadataInstantTimestamp)) {
return true;
}
break;
default:
return false;
}

return false;
}

/**
* Initialize the Metadata Table by listing files and partitions from the file system.
*
* @param dataMetaClient {@code HoodieTableMetaClient} for the dataset.
* @param dataMetaClient {@code HoodieTableMetaClient} for the dataset.
* @param inflightInstantTimestamp
*/
private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient,
Expand Down
Expand Up @@ -81,6 +81,6 @@ protected final void writeTableMetadata(HoodieRollbackMetadata metadata) {
* @param metadata restore metadata of interest.
*/
protected final void writeTableMetadata(HoodieRestoreMetadata metadata) {
table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime));
table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime));
}
}
Expand Up @@ -139,7 +139,8 @@ void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws Exc
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE);
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
addConfigsForPopulateMetaFields(cfgBuilder, true);
HoodieWriteConfig cfg = cfgBuilder.build();

Expand Down Expand Up @@ -294,7 +295,8 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro
@Test
void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
boolean populateMetaFields = true;
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build());
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();

Expand Down Expand Up @@ -344,7 +346,7 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
newCommitTime = "002";
// WriteClient with custom config (disable small file handling)
HoodieWriteConfig smallFileWriteConfig = getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
try (SparkRDDWriteClient nClient = getHoodieWriteClient(smallFileWriteConfig)) {
nClient.startCommitWithTime(newCommitTime);

Expand Down