Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6534]Support consistent hashing row writer #9199

Conversation

stream2000
Copy link
Contributor

@stream2000 stream2000 commented Jul 14, 2023

Change Logs

Support consistent hashing row writer for bulk_insert load and clustering.

Impact

Support consistent hashing bulk_insert row writer

Risk level (write none, low medium or high below)

medium, will enabled by default since row writer is enabled by default

Documentation Update

will update document after landing

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@stream2000 stream2000 changed the title [HUDI-6534]Support consistent hashing row write [HUDI-6534]Support consistent hashing row writer Jul 14, 2023
@stream2000 stream2000 marked this pull request as draft July 14, 2023 11:09
@stream2000 stream2000 changed the title [HUDI-6534]Support consistent hashing row writer [HUDI-6534][WIP]Support consistent hashing row writer Jul 14, 2023
@stream2000 stream2000 force-pushed the HUDI-6534_consistent_hashing_buceket_index_row_writer branch 3 times, most recently from 5fa5151 to 8f1d8ef Compare July 16, 2023 09:39
@stream2000 stream2000 marked this pull request as ready for review July 20, 2023 02:54
@stream2000 stream2000 force-pushed the HUDI-6534_consistent_hashing_buceket_index_row_writer branch 2 times, most recently from d7062e5 to b8a0a16 Compare July 20, 2023 09:36
@stream2000 stream2000 changed the title [HUDI-6534][WIP]Support consistent hashing row writer [HUDI-6534]Support consistent hashing row writer Jul 20, 2023
@stream2000 stream2000 force-pushed the HUDI-6534_consistent_hashing_buceket_index_row_writer branch 7 times, most recently from 23579f5 to 3eca97c Compare July 23, 2023 10:04
@stream2000
Copy link
Contributor Author

@hudi-bot run azure

this.keyGeneratorOpt = Option.empty();
}
this.extractor = RowRecordKeyExtractor.getRowRecordKeyExtractor(populateMetaFields, keyGeneratorOpt);
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the check by design for consistent hash index? if yes we could move the check to the parent class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do dual write during consistent hashing bucket index resizing but CoW table do not support writing logs. And It's a little bit hard to move it to parent class since the closest parent class for these two ConsistentHashingPartitioner is BulkInsertPartitioner

@leesf leesf self-assigned this Jul 25, 2023
@@ -83,7 +83,7 @@ private static void extractHashingMetadataFromClusteringPlan(String instant, Hoo
ValidationUtils.checkState(p != null, "Clustering plan does not has partition info, plan: " + plan);
// Skip unrelated clustering group
if (!recordPartitions.contains(p)) {
return;
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you please clarify why the logic changed here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bug in the origin code. The set recordPartitions contains all partitions that we care about, and we iterate over input groups in clustering plan to construct hashing metadata. If current input group is not included in recordPartitions we need to continue to check the next input group instead of return directly.

/**
* Set consistent hashing for partition, used in clustering
*
* @param partition partition to set Consistent Hashing nodes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consistent Hashing -> consistent hashing

@@ -79,15 +94,19 @@ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<Hoodie
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();

RDDConsistentBucketBulkInsertPartitioner<T> partitioner = new RDDConsistentBucketBulkInsertPartitioner<>(getHoodieTable(), strategyParams, preserveHoodieMetadata);
addHashingChildNodes(partitioner, extraMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused by the name between ConsistentHashingBucketInsertPartitioner#addHashingChildrenNodes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I made a new interface call ConsistentHashingBucketInsertPartitioner to share the common method addHashingChildrenNodes between RDDConsistentBucketBulkInsertPartitioner and ConsistentBucketIndexBulkInsertPartitionerWithRows. I didn't put the method addHashingChildrenNodes in their closest parent class BulkInsertPartitioner because It's all about consistent hashing bucket index and I don't want to affect other partitioners. What do you think about this design here? Should I keep the current design, move this to the parent class, or duplicate the code for both partitioners?

try {
return Option.of((BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties));
} catch (ClassCastException cce) {
throw new HoodieIOException("Only those key generators implementing BuiltInKeyGenerator interface is supported with virtual keys");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also throw the cce here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is moved from BulkInsertDataInternalWriterHelper and I think it's reasonable to translate the cce into a more readable exceptionHoodieIOException.

*
* @param partitionToIdentifier Mapping from table partition to bucket identifier
*/
public static Map<String, Map<String, Integer>> generatePartitionToFileIdPfxIdxMap(Map<String, ConsistentBucketIdentifier> partitionToIdentifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can also add uts for the method ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add more tests~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test in TestConsistentBucketIdIdentifier.java

return super.getWriteHandleFactory(idx).map(writeHandleFactory -> new WriteHandleFactory() {
@Override
public HoodieWriteHandle create(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String partitionPath, String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
// Ensure we do not create append handle for consistent hashing bulk_insert, align with `ConsistentBucketBulkInsertDataInternalWriterHelper`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: When we bulk insert twice into a consistent hashing bucket index table, we need to write logs to existing file groups in the second bulk insert, while for normal bloom filter index table, we will always create new base files when bulk insert. However currently bulk insert row writer path do not support writing logs, so I add a check here to prevent user from bulk insert twice into a consistent hashing bucket index table. We should use upsert after first bulk insert for consistent hashing bucket index table.

return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf,
Arrays.stream(partitions).map(p -> Paths.get(basePath, p).toString()).collect(Collectors.toList()),
basePath, new JobConf(hadoopConf), true, false);
private List<Row> readRecords() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: Reading the data written by bulk insert row writer will have the same issue as #8838. So I use spark instead of input format to read data here.

@danny0405
Copy link
Contributor

@leesf , is it good to land now ? We still have 2 days for the 0.14.0 code freeze.

@danny0405 danny0405 added the spark Issues related to spark label Aug 3, 2023
@stream2000 stream2000 force-pushed the HUDI-6534_consistent_hashing_buceket_index_row_writer branch from 3eca97c to 535bf53 Compare August 3, 2023 12:58
@stream2000 stream2000 force-pushed the HUDI-6534_consistent_hashing_buceket_index_row_writer branch 4 times, most recently from abf8721 to 884a71a Compare August 4, 2023 00:15
.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, partitioner, true, numOutputGroups);
}

private void addHashingChildNodes(ConsistentHashingBucketInsertPartitioner partitioner, Map<String, String> extraMetadata) {
try {
List<ConsistentHashingNode> nodes = ConsistentHashingNode.fromJsonString(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the nodes be null and need check in addHashingChildrenNodes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check the nodes is non-emtpy in addHashingChildrenNodes


@Override
public void addHashingChildrenNodes(String partition, List<ConsistentHashingNode> nodes) {
ValidationUtils.checkState(nodes.stream().noneMatch(n -> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason the tag will not be NORMAL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normal nodes are read from metadata in file system, while REPLACE and DELETE nodes are read from consistent hashing resizing plan. Here we are adding nodes read from clustering plan to the hash metadata so all nodes must not be NORMAL

@@ -65,7 +71,6 @@ public void write(InternalRow row) throws IOException {
int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), indexKeyFields, bucketNum);
Pair<UTF8String, Integer> fileId = Pair.of(partitionPath, bucketId);
if (lastFileId == null || !lastFileId.equals(fileId)) {
LOG.info("Creating new file for partition path " + partitionPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this log is useless then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will not always create a new file handle if we have already created one for this partition so I delete this line. I will add it back to getBucketRowCreateHandle when we do need to create a new handle.


private static final Logger LOG = LoggerFactory.getLogger(ConsistentBucketBulkInsertDataInternalWriterHelper.class);

public ConsistentBucketBulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the constructor has too many args, which can wrap in a class?

@stream2000 stream2000 force-pushed the HUDI-6534_consistent_hashing_buceket_index_row_writer branch 2 times, most recently from 2a3b8ac to 445cf75 Compare August 7, 2023 09:56
@stream2000 stream2000 force-pushed the HUDI-6534_consistent_hashing_buceket_index_row_writer branch from 445cf75 to 19efc8e Compare August 7, 2023 15:09
@hudi-bot
Copy link

hudi-bot commented Aug 7, 2023

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@leesf leesf merged commit 7541cd7 into apache:master Aug 8, 2023
27 checks passed
@prashantwason
Copy link
Member

@stream2000 The build is failing due to a test failure due to this commit. Can you please check?
https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_apis/build/builds/19273/logs/25

This is blocking 0.14.0 release so please prioritize if possible.

@prashantwason
Copy link
Member

@hudi-bot run azure

@danny0405
Copy link
Contributor

@prashantwason You can cherry pick #9401

String fileId = FSUtils.createNewFileId(node.getFileIdPrefix(), 0);

ValidationUtils.checkArgument(node.getTag() != ConsistentHashingNode.NodeTag.NORMAL
|| hoodieTable.getFileSystemView()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check too heavy to do when starting write to a new file group?

I think it might be simpler if we add a file existing check in the init method of class HoodieRowCreateHandle and fail the writer if file existing.

e.g.

  public HoodieRowCreateHandle(HoodieTable table,
                               HoodieWriteConfig writeConfig,
                               String partitionPath,
                               String fileId,
                               String instantTime,
                               int taskPartitionId,
                               long taskId,
                               long taskEpochId,
                               StructType structType,
                               boolean shouldPreserveHoodieMetadata) {
    ...
    FileSystem fs = table.getMetaClient().getFs();

    String writeToken = getWriteToken(taskPartitionId, taskId, taskEpochId);
    String fileName = FSUtils.makeBaseFileName(instantTime, writeToken, this.fileId, table.getBaseFileExtension());
    this.path = makeNewPath(fs, partitionPath, fileName, writeConfig);
    if (fs.exists(this.path)) {
      throw new HoodieException("File already exist. Can't bulk insert. Can use insert|upsert"); 
    }
    ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can one writer helper writes multiple file groups?

Copy link
Contributor

@TengHuo TengHuo Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, one writer helper can write multiple file groups, but one object of HoodieRowCreateHandle can only write one parquet file, which is this.path. This write helper will switch between the different handlers.

So in the method BulkInsertDataInternalWriterHelper.write(InternalRow row), BucketBulkInsertDataInternalWriterHelper.write(InternalRow row) and ConsistentBucketBulkInsertDataInternalWriterHelper.write(InternalRow row), they will create a new object of HoodieRowCreateHandle if the incoming row need to be written into a new parquet file.

We only have this existing file group check in ConsistentBucketBulkInsertDataInternalWriterHelper.getBucketRowCreateHandle, but actually, other bulk insert writer BulkInsertDataInternalWriterHelper and BucketBulkInsertDataInternalWriterHelper need this check as well. So I think it will be simpler if we add a check in the init method of HoodieRowCreateHandle.

Copy link
Contributor

@TengHuo TengHuo Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just check the code, this writer help class is used in the method HoodieDatasetBulkInsertHelper.bulkInsert in a mapPartitions method, and org.apache.hudi.spark3.internal.HoodieBulkInsertDataInternalWriter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it looks reasonable for improvement, you can try to raise a PR then, maybe @stream2000 can help for a reivew~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sure, I'm adding a few test in our internal code, will raise a PR for with the test cases and fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TengHuo I think using file system view which can help filter out replaced/uncommitted file slices is a more common way when we need information about file slices. And does the write token is the same between every write for the same file id?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh got it, Indeed, writeToken is different every time when a task or stage failed and retry. So, just checking fs.exists(this.path) is not good enough, need to check file group with instantTime. In that case, using hoodieTable.getFileSystemView() is better then list partition.

writeConfig.populateMetaFields,
arePartitionRecordsSorted,
shouldPreserveHoodieMetadata)
case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support SIMPLE bucket index as well?

It uses BucketBulkInsertDataInternalWriterHelper.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
spark Issues related to spark
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants