Skip to content

Commit

Permalink
Address feedback, remove unused method, rename methods
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Sep 19, 2022
1 parent a6b676c commit 0577454
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 38 deletions.
Expand Up @@ -53,7 +53,7 @@ public class TimelineUtils {
* Returns partitions that have new data strictly after commitTime.
* Does not include internal operations such as clean in the timeline.
*/
public static List<String> getPartitionsWritten(HoodieTimeline timeline) {
public static List<String> getWrittenPartitions(HoodieTimeline timeline) {
HoodieTimeline timelineToSync = timeline.getWriteTimeline();
return getAffectedPartitions(timelineToSync);
}
Expand All @@ -62,7 +62,7 @@ public static List<String> getPartitionsWritten(HoodieTimeline timeline) {
* Returns partitions that have been deleted or marked for deletion in the given timeline.
* Does not include internal operations such as clean in the timeline.
*/
public static List<String> getPartitionsDropped(HoodieTimeline timeline) {
public static List<String> getDroppedPartitions(HoodieTimeline timeline) {
HoodieTimeline replaceCommitTimeline = timeline.getWriteTimeline().filterCompletedInstants().getCompletedReplaceTimeline();

return replaceCommitTimeline.getInstants().flatMap(instant -> {
Expand Down
Expand Up @@ -130,11 +130,11 @@ public void testGetPartitions() throws IOException {
assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"}));

// verify only commit actions
partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
assertEquals(4, partitions.size());
assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));

partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
assertEquals(3, partitions.size());
assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
}
Expand Down
Expand Up @@ -194,7 +194,7 @@ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, b
if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
writtenPartitionsSince = new ArrayList<>();
} else {
writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
}
LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSince.size());

Expand Down
Expand Up @@ -223,12 +223,12 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName);
}
LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
List<String> writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
List<String> writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());

// Sync the partitions if needed
// find dropped partitions, if any, in the latest commit
Set<String> droppedPartitions = syncClient.getDroppedPartitions(lastCommitTimeSynced);
Set<String> droppedPartitions = syncClient.getDroppedPartitionsSince(lastCommitTimeSynced);
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
Expand Down
Expand Up @@ -197,7 +197,7 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode)
+ "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");

hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.empty());
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
Expand Down Expand Up @@ -475,7 +475,7 @@ public void testSyncIncremental(String syncMode) throws Exception {

// Lets do the sync
reSyncHiveTable();
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(commitTime1));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
Expand Down Expand Up @@ -754,7 +754,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception {
HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);

reinitHiveSyncClient();
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(instantTime));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
Expand Down Expand Up @@ -784,7 +784,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception {
"Table partitions should match the number of partitions we wrote");
assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
assertEquals(1, hiveClient.getWrittenPartitionsSince(Option.of(commitTime2)).size());
}

@ParameterizedTest
Expand Down
Expand Up @@ -20,17 +20,13 @@

import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.hudi.sync.common.model.PartitionEvent;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
Expand Down Expand Up @@ -91,12 +87,12 @@ public boolean isBootstrap() {
* If last sync time is not known then consider only active timeline.
* Going through archive timeline is a costly operation, and it should be avoided unless some start time is given.
*/
public Set<String> getDroppedPartitions(Option<String> lastCommitTimeSynced) {
public Set<String> getDroppedPartitionsSince(Option<String> lastCommitTimeSynced) {
HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline();
return new HashSet<>(TimelineUtils.getPartitionsDropped(timeline));
return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
}

@Override
Expand All @@ -108,7 +104,7 @@ public MessageType getStorageSchema() {
}
}

public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
public List<String> getWrittenPartitionsSince(Option<String> lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions in "
+ config.getString(META_SYNC_BASE_PATH)
Expand All @@ -120,7 +116,7 @@ public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSyn
config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
return TimelineUtils.getPartitionsWritten(
return TimelineUtils.getWrittenPartitions(
metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
Expand Down Expand Up @@ -163,23 +159,4 @@ public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions,
}
return events;
}

/**
* Get Last commit's Metadata.
*/
private static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
HoodieInstant instant = timeline.lastInstant().get();
byte[] data = timeline.getInstantDetails(instant).get();
return HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction()) ? Option.of(HoodieReplaceCommitMetadata.fromBytes(data, HoodieReplaceCommitMetadata.class)) :
Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to get commit metadata", e);
}
}
}

0 comments on commit 0577454

Please sign in to comment.