Skip to content

[HUDI-6118] Some fixes to improve the MDT and record index code base.#9106

Merged
danny0405 merged 5 commits intoapache:masterfrom
prashantwason:pw_testing_fixes
Jul 22, 2023
Merged

[HUDI-6118] Some fixes to improve the MDT and record index code base.#9106
danny0405 merged 5 commits intoapache:masterfrom
prashantwason:pw_testing_fixes

Conversation

@prashantwason
Copy link
Member

[HUDI-6118] Some fixes to improve the MDT and record index code base.

Change Logs

  1. Print MDT partition name instead of the enum tostring in logs
  2. Use fsView.loadAllPartitions()
  3. When publishing size metrics for MDT, only consider partitions which have been initialized
  4. Fixed job status names
  5. Limited logs which were printing the entire list of partitions. This is very verbose for datasets with large number of partitions
  6. Added a config to reduce the max parallelism of record index initialization.
  7. Changed defaults for MDT write configs to reasonable values
  8. Added config for MDT logBlock size. Larger blocks are preferred to reduce lookup time.
  9. Fixed the size metrics for MDT. These metrics should be set instead of incremented.

Impact

Fixes issues for the recently commited RI and MDT changes

Risk level (write none, low medium or high below)

Low

Documentation Update

None

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@prashantwason prashantwason requested a review from nsivabalan June 30, 2023 17:04
public Map<String, String> stats() {
return metrics.map(m -> m.getStats(true, metadataMetaClient, this)).orElse(new HashMap<>());
Set<String> allMetadataPartitionPaths = Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
return metrics.map(m -> m.getStats(true, metadataMetaClient, this, allMetadataPartitionPaths)).orElse(new HashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to fetch the enabled partitions here instead of all?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieMetadataMetrics.getStats(boolean detailed, HoodieTableMetaClient metaClient, HoodieTableMetadata metadata)

reloads the timeline.
can we move the reload to outside of the caller so that we don't reload for every MDT partition stats

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the reload of timeline. It is actually not required since the code is called right after commit where the metaClient is reloaded anyways.

try {
LOG.info("Building file system view for partitions " + partitionSet);
if (partitionSet.size() < 100) {
LOG.info("Building file system view for partitions: " + partitionSet);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just switch to LOG.debug, is the logging really useful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, may be we should reconsider the freq of logging here. for eg, log every every 100 partitions or something. not sure we will gain much by logging this for every partition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted to a debug log.

// File groups in each partition are fixed at creation time and we do not want them to be split into multiple files
// ever. Hence, we use a very large basefile size in metadata table. The actual size of the HFiles created will
// eventually depend on the number of file groups selected for each partition (See estimateFileGroupCount function)
final long maxHFileSizeBytes = 10 * 1024 * 1024 * 1024L; // 10GB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define them as static constants instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Keeping the log blocks as large as the log files themselves reduces the number of HFile blocks to be checked for
// presence of keys.
final long maxLogFileSizeBytes = writeConfig.getMetadataConfig().getMaxLogFileSize();
final long maxLogBlockSizeBytes = maxLogFileSizeBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Moved the comment to where it is used.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @prashantwason , the cleaning strategy change for MDT is huge and can you elaborate the details here?

try {
LOG.info("Building file system view for partitions " + partitionSet);
if (partitionSet.size() < 100) {
LOG.info("Building file system view for partitions: " + partitionSet);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, may be we should reconsider the freq of logging here. for eg, log every every 100 partitions or something. not sure we will gain much by logging this for every partition.

public Map<String, String> stats() {
return metrics.map(m -> m.getStats(true, metadataMetaClient, this)).orElse(new HashMap<>());
Set<String> allMetadataPartitionPaths = Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
return metrics.map(m -> m.getStats(true, metadataMetaClient, this, allMetadataPartitionPaths)).orElse(new HashMap<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieMetadataMetrics.getStats(boolean detailed, HoodieTableMetaClient metaClient, HoodieTableMetadata metadata)

reloads the timeline.
can we move the reload to outside of the caller so that we don't reload for every MDT partition stats

@nsivabalan nsivabalan added release-0.14.0 priority:blocker Production down; release blocker labels Jul 4, 2023
@prashantwason
Copy link
Member Author

@danny0405 @nsivabalan I think the cleaning strategy change for MDT is a bugfix because of the following enhancements:

  1. Initial commit on the MDT will create hfiles
  2. Rollbacks not actually rollback the MDT instead of adding a -f1, -f2 deltacommit

If we KEEP_LATEST_COMMITS then a wrong setting here would probably keep only a single HFile and that will limit the rollback. We cannot rollback the MDT beyond the last hfile as we will lose data.

@danny0405
Copy link
Contributor

then a wrong setting here would probably keep only a single HFile

Can we add some validation logic in metadata table write config builder and guard the correctness? To keep at least 2 version for each file group will also double the storage for metadata table.

@prashantwason
Copy link
Member Author

@hudi-bot run azure

1 similar comment
@prashantwason
Copy link
Member Author

@hudi-bot run azure

@prashantwason
Copy link
Member Author

@danny0405 @nsivabalan I have reverted the change to the cleaning policy. PTAL again.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prashantwason :
can we increase the value for DEFAULT_METADATA_CLEANER_COMMITS_RETAINED to 20.
I understand it may not give full coverage, but atleast some buffer during restore.

prashantwason and others added 5 commits July 20, 2023 17:14
1. Print MDT partition name instead of the enum tostring in logs
2. Use fsView.loadAllPartitions()
3. When publishing size metrics for MDT, only consider partitions which have been initialized
4. Fixed job status names
5. Limited logs which were printing the entire list of partitions. This is very verbose for datasets with large number of partitions
6. Added a config to reduce the max parallelism of record index initialization.
7. Changed defaults for MDT write configs to reasonable values
8. Added config for MDT logBlock size. Larger blocks are preferred to reduce lookup time.
9. Fixed the size metrics for MDT. These metrics should be set instead of incremented.
Renamed withMaxInitParallelism as it is only for RI
public static final ConfigProperty<Integer> RECORD_INDEX_MAX_PARALLELISM = ConfigProperty
.key(METADATA_PREFIX + ".max.init.parallelism")
.defaultValue(100000)
.sinceVersion("0.14.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a parallelism of 100000 ?

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, except a confusion why we need a max init parallelism for RLI with 100000?

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@prashantwason
Copy link
Member Author

@danny0405 The max init values for other indexes are too low (See HUID 6553). Indexes are really useful for large datasets which have large number of partitions and files. Assume a large dataset with 100K+ files. The default parallelism of the index initialization in code is like 200 which would take HOURS for the indexes to be built. With a large parallelism:

  1. The actual used parallelism is min(number_of_operations, 100,000)
  2. So for small datasets, the lower value is used'
  3. For larger datasets 100K is used.

We routinely have datasets with over 1M files in them (as large as 6M files). I have tested with various parallelism values and its not an exact science but somewhere around 100,000 was where I got the fastest bootstrap of the indexes. Very large parallelism causes OOM and memory issues on Spark.

If you leave the defaults to 200 -> many people would report timeouts building indexes on larger tables.

@danny0405 danny0405 merged commit 629349c into apache:master Jul 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker release-0.14.0

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

4 participants