Skip to content

Commit

Permalink
[HUDI-4832] Fix drop partition meta sync (#6662)
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Sep 19, 2022
1 parent 1b2792c commit 8962946
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 61 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -52,11 +53,34 @@ public class TimelineUtils {
* Returns partitions that have new data strictly after commitTime.
* Does not include internal operations such as clean in the timeline.
*/
public static List<String> getPartitionsWritten(HoodieTimeline timeline) {
public static List<String> getWrittenPartitions(HoodieTimeline timeline) {
HoodieTimeline timelineToSync = timeline.getWriteTimeline();
return getAffectedPartitions(timelineToSync);
}

/**
* Returns partitions that have been deleted or marked for deletion in the given timeline.
* Does not include internal operations such as clean in the timeline.
*/
public static List<String> getDroppedPartitions(HoodieTimeline timeline) {
HoodieTimeline replaceCommitTimeline = timeline.getWriteTimeline().filterCompletedInstants().getCompletedReplaceTimeline();

return replaceCommitTimeline.getInstants().flatMap(instant -> {
try {
HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes(
replaceCommitTimeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
if (WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {
Map<String, List<String>> partitionToReplaceFileIds = commitMetadata.getPartitionToReplaceFileIds();
return partitionToReplaceFileIds.keySet().stream();
} else {
return Stream.empty();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to get partitions modified at " + instant, e);
}
}).distinct().filter(partition -> !partition.isEmpty()).collect(Collectors.toList());
}

/**
* Returns partitions that have been modified including internal operations such as clean in the passed timeline.
*/
Expand Down
Expand Up @@ -1357,22 +1357,4 @@ public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableC
inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions());
return inflightAndCompletedPartitions;
}

/**
* Get Last commit's Metadata.
*/
public static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
HoodieInstant instant = timeline.lastInstant().get();
byte[] data = timeline.getInstantDetails(instant).get();
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to get commit metadata", e);
}
}
}
Expand Up @@ -130,11 +130,11 @@ public void testGetPartitions() throws IOException {
assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"}));

// verify only commit actions
partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
assertEquals(4, partitions.size());
assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));

partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
assertEquals(3, partitions.size());
assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
}
Expand Down
Expand Up @@ -194,7 +194,7 @@ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, b
if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
writtenPartitionsSince = new ArrayList<>();
} else {
writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
}
LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSince.size());

Expand Down
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
Expand Down Expand Up @@ -199,9 +200,6 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
// Check if the necessary table exists
boolean tableExists = syncClient.tableExists(tableName);

// check if isDropPartition
boolean isDropPartition = syncClient.isDropPartition();

// Get the parquet schema for this table looking at the latest commit
MessageType schema = syncClient.getStorageSchema();

Expand All @@ -225,11 +223,13 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName);
}
LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
List<String> writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
List<String> writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());

// Sync the partitions if needed
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
// find dropped partitions, if any, in the latest commit
Set<String> droppedPartitions = syncClient.getDroppedPartitionsSince(lastCommitTimeSynced);
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
syncClient.updateLastCommitTimeSynced(tableName);
Expand Down Expand Up @@ -310,12 +310,12 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
* Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, boolean isDropPartition) {
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, Set<String> droppedPartitions) {
boolean partitionsChanged;
try {
List<Partition> hivePartitions = syncClient.getAllPartitions(tableName);
List<PartitionEvent> partitionEvents =
syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition);
syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, droppedPartitions);

List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
if (!newPartitions.isEmpty()) {
Expand Down
Expand Up @@ -197,8 +197,8 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode)
+ "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");

hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.empty());
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
"The one partition event must of type UPDATE");
Expand Down Expand Up @@ -475,10 +475,10 @@ public void testSyncIncremental(String syncMode) throws Exception {

// Lets do the sync
reSyncHiveTable();
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(commitTime1));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");

Expand Down Expand Up @@ -754,10 +754,10 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception {
HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);

reinitHiveSyncClient();
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(instantTime));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");

Expand All @@ -784,7 +784,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception {
"Table partitions should match the number of partitions we wrote");
assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
assertEquals(1, hiveClient.getWrittenPartitionsSince(Option.of(commitTime2)).size());
}

@ParameterizedTest
Expand Down Expand Up @@ -854,17 +854,33 @@ public void testDropPartition(String syncMode) throws Exception {
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
// add a partition but do not sync
String instantTime2 = "101";
String newPartition = "2010/02/01";
HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2);
HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." + HiveTestUtil.TABLE_NAME);
partitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
assertEquals(1, partitions.size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");

// create two replace commits to delete current partitions, but do not sync in between
String partitiontoDelete = partitions.get(0).getValues().get(0).replace("-", "/");
// create a replace commit to delete current partitions+
HiveTestUtil.createReplaceCommit("101", partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true);
String instantTime3 = "102";
HiveTestUtil.createReplaceCommit(instantTime3, partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true);
String instantTime4 = "103";
HiveTestUtil.createReplaceCommit(instantTime4, newPartition, WriteOperationType.DELETE_PARTITION, true, true);

// sync drop partitions
// now run hive sync
reinitHiveSyncClient();
reSyncHiveTable();

List<Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
assertEquals(0, hivePartitions.size(),
"Table should have 0 partition because of the drop the only one partition");
"Table should have no partitions");
assertEquals(instantTime4, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
}

@ParameterizedTest
Expand Down
Expand Up @@ -493,7 +493,7 @@ public static void createCommitFile(HoodieCommitMetadata commitMetadata, String
fsout.close();
}

public static void createReplaceCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException {
public static void createReplaceCommitFile(HoodieReplaceCommitMetadata commitMetadata, String instantTime) throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeReplaceFileName(instantTime));
Expand Down
Expand Up @@ -20,16 +20,13 @@

import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.hudi.sync.common.model.PartitionEvent;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
Expand All @@ -41,8 +38,10 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
Expand Down Expand Up @@ -83,18 +82,17 @@ public boolean isBootstrap() {
return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
}

public boolean isDropPartition() {
try {
Option<HoodieCommitMetadata> hoodieCommitMetadata = HoodieTableMetadataUtil.getLatestCommitMetadata(metaClient);

if (hoodieCommitMetadata.isPresent()
&& WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) {
return true;
}
} catch (Exception e) {
throw new HoodieSyncException("Failed to get commit metadata", e);
}
return false;
/**
* Get the set of dropped partitions since the last synced commit.
* If last sync time is not known then consider only active timeline.
* Going through archive timeline is a costly operation, and it should be avoided unless some start time is given.
*/
public Set<String> getDroppedPartitionsSince(Option<String> lastCommitTimeSynced) {
HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline();
return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
}

@Override
Expand All @@ -106,7 +104,7 @@ public MessageType getStorageSchema() {
}
}

public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
public List<String> getWrittenPartitionsSince(Option<String> lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions in "
+ config.getString(META_SYNC_BASE_PATH)
Expand All @@ -118,16 +116,19 @@ public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSyn
config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
return TimelineUtils.getWrittenPartitions(
metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
}
}

/**
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) {
public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, Set<String> droppedPartitions) {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
Expand All @@ -143,7 +144,7 @@ public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions,
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);

if (isDropPartition) {
if (droppedPartitions.contains(storagePartition)) {
events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
} else {
if (!storagePartitionValues.isEmpty()) {
Expand Down

0 comments on commit 8962946

Please sign in to comment.