Skip to content

Commit

Permalink
[HUDI-4832] Fix drop partition meta sync
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Sep 13, 2022
1 parent a942d9d commit 8d100f5
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1357,22 +1357,4 @@ public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableC
inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions());
return inflightAndCompletedPartitions;
}

/**
* Get Last commit's Metadata.
*/
public static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
HoodieInstant instant = timeline.lastInstant().get();
byte[] data = timeline.getInstantDetails(instant).get();
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to get commit metadata", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
Expand Down Expand Up @@ -199,9 +200,6 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
// Check if the necessary table exists
boolean tableExists = syncClient.tableExists(tableName);

// check if isDropPartition
boolean isDropPartition = syncClient.isDropPartition();

// Get the parquet schema for this table looking at the latest commit
MessageType schema = syncClient.getStorageSchema();

Expand Down Expand Up @@ -229,7 +227,9 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());

// Sync the partitions if needed
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
// find dropped partitions, if any, in the latest commit
Set<String> droppedPartitions = syncClient.getDroppedPartitions();
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
syncClient.updateLastCommitTimeSynced(tableName);
Expand Down Expand Up @@ -310,12 +310,12 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
* Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, boolean isDropPartition) {
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, Set<String> droppedPartitions) {
boolean partitionsChanged;
try {
List<Partition> hivePartitions = syncClient.getAllPartitions(tableName);
List<PartitionEvent> partitionEvents =
syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition);
syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, droppedPartitions);

List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
if (!newPartitions.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@
public class TestHiveSyncTool {

private static final List<Object> SYNC_MODES = Arrays.asList(
"hiveql",
"hms",
/*"hiveql",
"hms",*/
"jdbc");

private static Iterable<Object> syncMode() {
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode)

hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
"The one partition event must of type UPDATE");
Expand Down Expand Up @@ -478,7 +478,7 @@ public void testSyncIncremental(String syncMode) throws Exception {
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");

Expand Down Expand Up @@ -757,7 +757,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception {
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");

Expand Down Expand Up @@ -854,17 +854,32 @@ public void testDropPartition(String syncMode) throws Exception {
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
// add a partition but do not sync
String instantTime2 = "101";
String newPartition = "2010/02/01";
HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2);
HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." + HiveTestUtil.TABLE_NAME);
partitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
assertEquals(1, partitions.size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");

// create a replace commit to delete current partitions
String partitiontoDelete = partitions.get(0).getValues().get(0).replace("-", "/");
// create a replace commit to delete current partitions+
HiveTestUtil.createReplaceCommit("101", partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true);
String instantTime3 = "102";
HiveTestUtil.createReplaceCommit(instantTime3, partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true);

// sync drop partitions
reinitHiveSyncClient();
reSyncHiveTable();

List<Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
assertEquals(0, hivePartitions.size(),
"Table should have 0 partition because of the drop the only one partition");
assertEquals(1, hivePartitions.size(),
"Table should have 1 partition that was added for instant " + instantTime2);
assertEquals(newPartition, hivePartitions.get(0).getValues().get(0).replace("-", "/"));
assertEquals(instantTime3, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public static void createCommitFile(HoodieCommitMetadata commitMetadata, String
fsout.close();
}

public static void createReplaceCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException {
public static void createReplaceCommitFile(HoodieReplaceCommitMetadata commitMetadata, String instantTime) throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeReplaceFileName(instantTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.hudi.sync.common.model.PartitionEvent;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
Expand All @@ -40,9 +42,11 @@
import org.apache.parquet.schema.MessageType;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
Expand Down Expand Up @@ -83,18 +87,24 @@ public boolean isBootstrap() {
return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
}

public boolean isDropPartition() {
/**
* Get the set of dropped partitions based on the latest commit metadata.
* Returns empty set if the latest commit was not due to DELETE_PARTITION operation.
*/
public Set<String> getDroppedPartitions() {
try {
Option<HoodieCommitMetadata> hoodieCommitMetadata = HoodieTableMetadataUtil.getLatestCommitMetadata(metaClient);
Option<HoodieCommitMetadata> hoodieCommitMetadata = getLatestCommitMetadata(metaClient);

if (hoodieCommitMetadata.isPresent()
&& WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) {
return true;
Map<String, List<String>> partitionToReplaceFileIds =
((HoodieReplaceCommitMetadata) hoodieCommitMetadata.get()).getPartitionToReplaceFileIds();
return partitionToReplaceFileIds.keySet();
}
} catch (Exception e) {
throw new HoodieSyncException("Failed to get commit metadata", e);
}
return false;
return Collections.emptySet();
}

@Override
Expand All @@ -118,16 +128,19 @@ public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSyn
config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
return TimelineUtils.getPartitionsWritten(
metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
}
}

/**
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) {
public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, Set<String> droppedPartitions) {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
Expand All @@ -143,7 +156,7 @@ public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions,
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);

if (isDropPartition) {
if (droppedPartitions.contains(storagePartition)) {
events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
} else {
if (!storagePartitionValues.isEmpty()) {
Expand All @@ -158,4 +171,23 @@ public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions,
}
return events;
}

/**
* Get Last commit's Metadata.
*/
private static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
HoodieInstant instant = timeline.lastInstant().get();
byte[] data = timeline.getInstantDetails(instant).get();
return HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction()) ? Option.of(HoodieReplaceCommitMetadata.fromBytes(data, HoodieReplaceCommitMetadata.class)) :
Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to get commit metadata", e);
}
}
}

0 comments on commit 8d100f5

Please sign in to comment.