Skip to content

Commit

Permalink
[HUDI-4760] Fixing repeated trigger of data file creations w/ cluster…
Browse files Browse the repository at this point in the history
…ing (#6561)

- Apparently in clustering, data file creations are triggered twice since we don't cache the write status and for doing some validation, we do isEmpty on JavaRDD which ended up retriggering the action. Fixing the double de-referencing in this patch.
  • Loading branch information
nsivabalan authored and yuzhaojing committed Sep 29, 2022
1 parent 19b74ea commit d75f5ae
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ protected void runPrecommitValidators(HoodieWriteMetadata<O> writeMetadata) {
}
throw new HoodieIOException("Precommit validation not implemented for all engines yet");
}

protected void commitOnAutoCommit(HoodieWriteMetadata result) {
// validate commit action before committing result
runPrecommitValidators(result);
Expand Down Expand Up @@ -249,7 +249,6 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieC
HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
validateWriteResult(clusteringPlan, writeMetadata);
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.client;

import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.TransactionUtils;
Expand All @@ -37,7 +39,9 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
Expand Down Expand Up @@ -356,6 +360,8 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstan
LOG.info("Starting clustering at " + clusteringInstant);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.cluster(context, clusteringInstant);
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
// Validation has to be done after cloning. if not, it could result in dereferencing the write status twice which means clustering could get executed twice.
validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
Expand Down Expand Up @@ -403,6 +409,19 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}

private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
if (clusteringMetadata.getWriteStatuses().isEmpty()) {
HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime))
.map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
"Unable to read clustering plan for instant: " + clusteringCommitTime));
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringCommitTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+ clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
}

private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -106,6 +107,7 @@

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -1459,6 +1461,42 @@ public void testSimpleClustering(boolean populateMetaFields, boolean preserveCom
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}

@Test
public void testAndValidateClusteringOutputFiles() throws IOException {
String partitionPath = "2015/03/16";
testInsertTwoBatches(true, partitionPath);

// Trigger clustering
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withAutoCommit(false)
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(2).build());
try (SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build())) {
int numRecords = 200;
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(newCommitTime, numRecords);
client.startCommitWithTime(newCommitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(records1, 2);
JavaRDD<WriteStatus> statuses = client.insert(insertRecordsRDD1, newCommitTime);
client.commit(newCommitTime, statuses);
List<WriteStatus> statusList = statuses.collect();
assertNoWriteErrors(statusList);

metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieInstant replaceCommitInstant = metaClient.getActiveTimeline().getCompletedReplaceTimeline().firstInstant().get();
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(replaceCommitInstant).get(), HoodieReplaceCommitMetadata.class);

List<String> filesFromReplaceCommit = new ArrayList<>();
replaceCommitMetadata.getPartitionToWriteStats()
.forEach((k,v) -> v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath())));

// find all parquet files created as part of clustering. Verify it matces w/ whats found in replace commit metadata.
FileStatus[] fileStatuses = fs.listStatus(new Path(basePath + "/" + partitionPath));
List<String> clusteredFiles = Arrays.stream(fileStatuses).filter(entry -> entry.getPath().getName().contains(replaceCommitInstant.getTimestamp()))
.map(fileStatus -> partitionPath + "/" + fileStatus.getPath().getName()).collect(Collectors.toList());
assertEquals(clusteredFiles, filesFromReplaceCommit);
}
}

@Test
public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception {
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
Expand Down Expand Up @@ -1710,18 +1748,22 @@ private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig cluste
return allRecords.getLeft().getLeft();
}

private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
return testInsertTwoBatches(populateMetaFields, "2015/03/16");
}

/**
* This method returns following three items:
* 1. List of all HoodieRecord written in the two batches of insert.
* 2. Commit instants of the two batches.
* 3. List of new file group ids that were written in the two batches.
*/
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
populateMetaFields ? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"});
dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields);
Expand Down

0 comments on commit d75f5ae

Please sign in to comment.