Skip to content

Commit

Permalink
Addressing feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Aug 30, 2023
1 parent 5678847 commit 98d5cfb
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf,

this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();

if (dataMetaClient.getTableConfig().isMetadataTableAvailable() || writeConfig.isMetadataTableEnabled()) {
if (writeConfig.isMetadataTableEnabled()) {
this.metadataWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con
protected Option<HoodieTableMetadataWriter> getMetadataWriter(
String triggeringInstantTimestamp,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy) {
if (!HoodieTableMetadata.isMetadataTable(metaClient.getBasePathV2().toString()) && config.isMetadataTableEnabled()) {
if (config.isMetadataTableEnabled()) {
// if any partition is deleted, we need to reload the metadata table writer so that new table configs are picked up
// to reflect the delete mdt partitions.
deleteMetadataIndexIfNecessary();
Expand All @@ -112,6 +112,7 @@ protected Option<HoodieTableMetadataWriter> getMetadataWriter(
throw new HoodieMetadataException("Checking existence of metadata table failed", e);
}
} else {
// if metadata is not enabled in the write config, we should try and delete it (if present)
maybeDeleteMetadataTable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ public ExplicitMatchRange(Set<String> instants) {

@Override
public boolean isInRange(String instant) {
// if instant time is solo commit time or any special instant time that has suffix in MDT (which will not match DT's commit times), we treat them as valid.
if (instant.startsWith(SOLO_COMMIT_TIMESTAMP) || instant.length() == 20) {
return true;
}
return this.instants.contains(instant);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ static boolean isValidSuffix(String suffix) {
// This suffix and all after that are used for initialization of the various partitions. The unused suffixes lower than this value
// are reserved for future operations on the MDT.
private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // corresponds to "010";
// we have max of 4 partitions (FILES, COL_STATS, BLOOM, RLI)
private static final List<String> VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES = Arrays.asList("010","011","012","013");

/**
* Returns whether the files partition of metadata table is ready for read.
Expand Down Expand Up @@ -1282,10 +1284,21 @@ public static Set<String> getValidInstantTimestamps(HoodieTableMetaClient dataMe
validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline));
});

// add restore instants from MDT.
// add restore and rollback instants from MDT.
metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants()
.filter(instant -> instant.getAction().equals(HoodieTimeline.RESTORE_ACTION))
.filter(instant -> instant.getAction().equals(HoodieTimeline.RESTORE_ACTION) || instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION))
.getInstants().forEach(instant -> validInstantTimestamps.add(instant.getTimestamp()));

// add log compactions instants(005) from metadata table timeline which are complete.
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
.filter(instant -> instant.getTimestamp().length() == 20 && instant.getTimestamp().endsWith(OperationSuffix.LOG_COMPACTION.getSuffix()))
.getInstants().forEach(instant -> validInstantTimestamps.add(instant.getTimestamp()));

// add any valid delta commit that initialized new partition. suffix could be dynamic.
metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
.filter(instant -> instant.getTimestamp().length() == 20 && VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES.contains(instant.getTimestamp().substring(17, 20)))
.getInstants().forEach(instant -> validInstantTimestamps.add(instant.getTimestamp()));

return validInstantTimestamps;
}

Expand Down

0 comments on commit 98d5cfb

Please sign in to comment.