Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4832] Fix drop partition meta sync #6662

Merged
merged 4 commits into from Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -52,11 +53,34 @@ public class TimelineUtils {
* Returns partitions that have new data strictly after commitTime.
* Does not include internal operations such as clean in the timeline.
*/
public static List<String> getPartitionsWritten(HoodieTimeline timeline) {
public static List<String> getWrittenPartitions(HoodieTimeline timeline) {
HoodieTimeline timelineToSync = timeline.getWriteTimeline();
return getAffectedPartitions(timelineToSync);
}

/**
* Returns partitions that have been deleted or marked for deletion in the given timeline.
* Does not include internal operations such as clean in the timeline.
*/
public static List<String> getDroppedPartitions(HoodieTimeline timeline) {
HoodieTimeline replaceCommitTimeline = timeline.getWriteTimeline().filterCompletedInstants().getCompletedReplaceTimeline();

return replaceCommitTimeline.getInstants().flatMap(instant -> {
try {
HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes(
replaceCommitTimeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
if (WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) {
Map<String, List<String>> partitionToReplaceFileIds = commitMetadata.getPartitionToReplaceFileIds();
return partitionToReplaceFileIds.keySet().stream();
} else {
return Stream.empty();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to get partitions modified at " + instant, e);
}
}).distinct().filter(partition -> !partition.isEmpty()).collect(Collectors.toList());
}

/**
* Returns partitions that have been modified including internal operations such as clean in the passed timeline.
*/
Expand Down
Expand Up @@ -1357,22 +1357,4 @@ public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableC
inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions());
return inflightAndCompletedPartitions;
}

/**
* Get Last commit's Metadata.
*/
public static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
HoodieInstant instant = timeline.lastInstant().get();
byte[] data = timeline.getInstantDetails(instant).get();
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to get commit metadata", e);
}
}
}
Expand Up @@ -130,11 +130,11 @@ public void testGetPartitions() throws IOException {
assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"}));

// verify only commit actions
partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
assertEquals(4, partitions.size());
assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"}));

partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4"));
assertEquals(3, partitions.size());
assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"}));
}
Expand Down
Expand Up @@ -194,7 +194,7 @@ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, b
if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
writtenPartitionsSince = new ArrayList<>();
} else {
writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
}
LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSince.size());

Expand Down
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
Expand Down Expand Up @@ -199,9 +200,6 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
// Check if the necessary table exists
boolean tableExists = syncClient.tableExists(tableName);

// check if isDropPartition
boolean isDropPartition = syncClient.isDropPartition();

// Get the parquet schema for this table looking at the latest commit
MessageType schema = syncClient.getStorageSchema();

Expand All @@ -225,11 +223,13 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName);
}
LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
List<String> writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
List<String> writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());

// Sync the partitions if needed
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
// find dropped partitions, if any, in the latest commit
Set<String> droppedPartitions = syncClient.getDroppedPartitionsSince(lastCommitTimeSynced);
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
syncClient.updateLastCommitTimeSynced(tableName);
Expand Down Expand Up @@ -310,12 +310,12 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
* Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, boolean isDropPartition) {
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, Set<String> droppedPartitions) {
boolean partitionsChanged;
try {
List<Partition> hivePartitions = syncClient.getAllPartitions(tableName);
List<PartitionEvent> partitionEvents =
syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition);
syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, droppedPartitions);

List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
if (!newPartitions.isEmpty()) {
Expand Down
Expand Up @@ -197,8 +197,8 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode)
+ "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");

hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.empty());
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
"The one partition event must of type UPDATE");
Expand Down Expand Up @@ -475,10 +475,10 @@ public void testSyncIncremental(String syncMode) throws Exception {

// Lets do the sync
reSyncHiveTable();
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(commitTime1));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");

Expand Down Expand Up @@ -754,10 +754,10 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception {
HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);

reinitHiveSyncClient();
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(instantTime));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");

Expand All @@ -784,7 +784,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception {
"Table partitions should match the number of partitions we wrote");
assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
assertEquals(1, hiveClient.getWrittenPartitionsSince(Option.of(commitTime2)).size());
}

@ParameterizedTest
Expand Down Expand Up @@ -854,17 +854,33 @@ public void testDropPartition(String syncMode) throws Exception {
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
// add a partition but do not sync
String instantTime2 = "101";
String newPartition = "2010/02/01";
HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2);
HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." + HiveTestUtil.TABLE_NAME);
partitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
assertEquals(1, partitions.size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");

// create two replace commits to delete current partitions, but do not sync in between
String partitiontoDelete = partitions.get(0).getValues().get(0).replace("-", "/");
// create a replace commit to delete current partitions+
HiveTestUtil.createReplaceCommit("101", partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true);
String instantTime3 = "102";
HiveTestUtil.createReplaceCommit(instantTime3, partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true);
String instantTime4 = "103";
HiveTestUtil.createReplaceCommit(instantTime4, newPartition, WriteOperationType.DELETE_PARTITION, true, true);

// sync drop partitions
// now run hive sync
reinitHiveSyncClient();
reSyncHiveTable();

List<Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
assertEquals(0, hivePartitions.size(),
"Table should have 0 partition because of the drop the only one partition");
"Table should have no partitions");
assertEquals(instantTime4, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
}

@ParameterizedTest
Expand Down
Expand Up @@ -493,7 +493,7 @@ public static void createCommitFile(HoodieCommitMetadata commitMetadata, String
fsout.close();
}

public static void createReplaceCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException {
public static void createReplaceCommitFile(HoodieReplaceCommitMetadata commitMetadata, String instantTime) throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeReplaceFileName(instantTime));
Expand Down
Expand Up @@ -20,16 +20,13 @@

import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.hudi.sync.common.model.PartitionEvent;
import org.apache.hudi.sync.common.model.PartitionValueExtractor;
Expand All @@ -41,8 +38,10 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
Expand Down Expand Up @@ -83,18 +82,17 @@ public boolean isBootstrap() {
return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
}

public boolean isDropPartition() {
try {
Option<HoodieCommitMetadata> hoodieCommitMetadata = HoodieTableMetadataUtil.getLatestCommitMetadata(metaClient);

if (hoodieCommitMetadata.isPresent()
&& WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) {
return true;
}
} catch (Exception e) {
throw new HoodieSyncException("Failed to get commit metadata", e);
}
return false;
/**
* Get the set of dropped partitions since the last synced commit.
* If last sync time is not known then consider only active timeline.
* Going through archive timeline is a costly operation, and it should be avoided unless some start time is given.
*/
public Set<String> getDroppedPartitionsSince(Option<String> lastCommitTimeSynced) {
HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline();
return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
}

@Override
Expand All @@ -106,7 +104,7 @@ public MessageType getStorageSchema() {
}
}

public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
public List<String> getWrittenPartitionsSince(Option<String> lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions in "
+ config.getString(META_SYNC_BASE_PATH)
Expand All @@ -118,16 +116,19 @@ public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSyn
config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
return TimelineUtils.getWrittenPartitions(
metaClient.getArchivedTimeline(lastCommitTimeSynced.get())
.mergeTimeline(metaClient.getActiveTimeline())
.getCommitsTimeline()
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
}
}

/**
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) {
public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, Set<String> droppedPartitions) {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
Expand All @@ -143,7 +144,7 @@ public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions,
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);

if (isDropPartition) {
if (droppedPartitions.contains(storagePartition)) {
events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
} else {
if (!storagePartitionValues.isEmpty()) {
Expand Down