Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,23 @@ public static HoodieDefaultTimeline getTimeline(HoodieTableMetaClient metaClient
return activeTimeline;
}

/**
* Returns the incremental timeline for meta sync.
*
* <p>The archived timeline may be parsed if the last synced commit time
* is far behind, the parsing of archived timeline is expensive, for most of the time,
* there is no need to do that, if the metadata is synced in time regularly.
*
* @param metaClient The meta client
* @param lastCommitTimeSynced The last synced commit time
*/
public static HoodieTimeline getIncSyncTimeline(HoodieTableMetaClient metaClient, String lastCommitTimeSynced) {
final HoodieDefaultTimeline timeline = metaClient.getActiveTimeline().isBeforeTimelineStarts(lastCommitTimeSynced)
? metaClient.getArchivedTimeline(lastCommitTimeSynced).mergeTimeline(metaClient.getActiveTimeline())
: metaClient.getActiveTimeline();
return timeline.getCommitsTimeline().findInstantsAfter(lastCommitTimeSynced, Integer.MAX_VALUE);
}

/**
* Returns the commit metadata of the given instant.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ public boolean isBootstrap() {
* Going through archive timeline is a costly operation, and it should be avoided unless some start time is given.
*/
public Set<String> getDroppedPartitionsSince(Option<String> lastCommitTimeSynced) {
HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline();
HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
? TimelineUtils.getIncSyncTimeline(metaClient, lastCommitTimeSynced.get())
: metaClient.getActiveTimeline();
return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
}

Expand Down Expand Up @@ -125,11 +124,7 @@ public List<String> getWrittenPartitionsSince(Option<String> lastCommitTimeSynce
config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
return TimelineUtils.getWrittenPartitions(
metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
return TimelineUtils.getWrittenPartitions(TimelineUtils.getIncSyncTimeline(metaClient, lastCommitTimeSynced.get()));
}
}

Expand Down