Skip to content

[HUDI-8340] Fixing functional index record generation using spark distributed computation#12091

Closed
nsivabalan wants to merge 3 commits intoapache:masterfrom
nsivabalan:HUDI-8340-fixingFunctionalIndexUpdates1
Closed

[HUDI-8340] Fixing functional index record generation using spark distributed computation#12091
nsivabalan wants to merge 3 commits intoapache:masterfrom
nsivabalan:HUDI-8340-fixingFunctionalIndexUpdates1

Conversation

@nsivabalan
Copy link
Contributor

Change Logs

Fixing functional index record generation using spark distributed computation.
This patch is stacked on top of #12090

Impact

Fixing functional index record generation using spark distributed computation

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Oct 13, 2024
@codope codope force-pushed the HUDI-8340-fixingFunctionalIndexUpdates1 branch from 978ae60 to 438f932 Compare October 14, 2024 09:14
@codope codope changed the title [HUDI-8340][HUDI-8341] Fixing functional index record generation using spark distributed computation [HUDI-8340] Fixing functional index record generation using spark distributed computation Oct 14, 2024
int parallelism = Math.min(partitionFileSlicePairs.size(), dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
List<Pair<String, Pair<String, Long>>> partitionFilePathPairs = new ArrayList<>();
commitMetadata.getPartitionToWriteStats().forEach((dataPartition, writeStats) -> writeStats.forEach(writeStat -> partitionFilePathPairs.add(
Pair.of(writeStat.getPartitionPath(), Pair.of(new StoragePath(dataMetaClient.getBasePath(), writeStat.getPath()).toString(), writeStat.getFileSizeInBytes())))));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed to combine with base path to form the full path (that's what readers expect)

Comment on lines +177 to +178
ForkJoinPool customThreadPool = new ForkJoinPool(parallelism);
List<HoodieRecord> allRecords = customThreadPool.submit(() ->
Copy link
Member

@codope codope Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't use engine context parallelism, because we need the context for dataset creation on which spark functions can be applied. So, going with usual java parallelism, which is still better than the previous sequential stats computation.

@codope codope force-pushed the HUDI-8340-fixingFunctionalIndexUpdates1 branch from b2d9b4f to 4ebb134 Compare October 14, 2024 19:28
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan nsivabalan closed this Oct 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M PR with lines of changes in (100, 300]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants