Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4760] Fixing repeated trigger of data file creations w/ clustering #6561

Merged
merged 7 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ protected void runPrecommitValidators(HoodieWriteMetadata<O> writeMetadata) {
}
throw new HoodieIOException("Precommit validation not implemented for all engines yet");
}

protected void commitOnAutoCommit(HoodieWriteMetadata result) {
// validate commit action before committing result
runPrecommitValidators(result);
Expand Down Expand Up @@ -249,7 +249,7 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieC
HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
validateWriteResult(clusteringPlan, writeMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's at least leave a comment (in this PR) that this line was dereferencing RDD 2d time writing out files twice

// if we don't cache the write statuses above, validation will call isEmpty which might retrigger the execution again.
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.client;

import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.TransactionUtils;
Expand All @@ -37,7 +39,9 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
Expand Down Expand Up @@ -356,6 +360,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstan
LOG.info("Starting clustering at " + clusteringInstant);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.cluster(context, clusteringInstant);
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment here why validation has to be here

// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
Expand Down Expand Up @@ -403,6 +408,19 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}

private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
if (clusteringMetadata.getWriteStatuses().isEmpty()) {
HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(clusteringCommitTime))
.map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
"Unable to read clustering plan for instant: " + clusteringCommitTime));
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + clusteringCommitTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+ clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
}

private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -106,6 +107,7 @@

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -1456,6 +1458,43 @@ public void testSimpleClustering(boolean populateMetaFields, boolean preserveCom
testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}

@Test
public void testAndValidateClusteringOutputFiles() throws IOException {
String partitionPath = "2015/03/16";
testInsertTwoBatches(true, partitionPath);

// Trigger clustering
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder().withEmbeddedTimelineServerEnabled(false).withAutoCommit(false)
.withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(2).build());
try (SparkRDDWriteClient client = getHoodieWriteClient(cfgBuilder.build())) {
int numRecords = 200;
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(newCommitTime, numRecords);
client.startCommitWithTime(newCommitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(records1, 2);
JavaRDD<WriteStatus> statuses = client.insert(insertRecordsRDD1, newCommitTime);
client.commit(newCommitTime, statuses);
List<WriteStatus> statusList = statuses.collect();
assertNoWriteErrors(statusList);

metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieInstant replaceCommitInstant = metaClient.getActiveTimeline().getCompletedReplaceTimeline().firstInstant().get();
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(replaceCommitInstant).get(), HoodieReplaceCommitMetadata.class);

List<String> filesFromReplaceCommit = new ArrayList<>();
replaceCommitMetadata.getPartitionToWriteStats().
forEach((k,v) ->
v.forEach(entry -> filesFromReplaceCommit.add(entry.getPath())));

// find all parquet files created as part of clustering. Verify it matces w/ whats found in replace commit metadata.
FileStatus[] fileStatuses = fs.listStatus(new Path(basePath + "/" + partitionPath));
List<String> clusteredFiles = Arrays.stream(fileStatuses).filter(entry -> entry.getPath().getName().contains(replaceCommitInstant.getTimestamp()))
.map(fileStatus -> partitionPath + "/" + fileStatus.getPath().getName()).collect(Collectors.toList());
assertEquals(clusteredFiles, filesFromReplaceCommit);
}
}

@Test
public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception {
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
Expand Down Expand Up @@ -1707,18 +1746,22 @@ private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig cluste
return allRecords.getLeft().getLeft();
}

private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
return testInsertTwoBatches(populateMetaFields, "2015/03/16");
}

/**
* This method returns following three items:
* 1. List of all HoodieRecord written in the two batches of insert.
* 2. Commit instants of the two batches.
* 3. List of new file group ids that were written in the two batches.
*/
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
populateMetaFields ? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"});
dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields);
Expand Down