Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7906] improve the parallelism deduce in rdd write #11470

Merged
merged 1 commit into from
Jun 22, 2024

Conversation

KnightChess
Copy link
Contributor

Change Logs

as #11274 and #11463 describe, there has two case question.

  • if the rdd is input rdd without shuffle, the partitiion number is too bigger or too small
  • user need can not control it easy
    • in some case user can set spark.default.parallelism change it.
    • in some case user can not change because hard-code
    • and in spark, the better way is use spark.default.parallelism or spark.sql.shuffle.partitions can control it, other is advanced in hudi.

Impact

like dedup where use new deduce logical, user can use spark.sql.shuffle.partitions or spark.default.parallelism control the parallelism.
For special scenes, also can use advanced params.

Risk level (write none, low medium or high below)

low

Documentation Update

None

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Jun 18, 2024

if (SQLConf.get().contains(SQLConf.SHUFFLE_PARTITIONS().key())) {
return SQLConf.get().defaultNumShufflePartitions();
} else if (rddData.context().conf().contains("spark.default.parallelism")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Java context may never has this config option right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean javaSparkContext? it contain, I have update the ut conf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the Java client contains a Spark config option, that is not kind of reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 the java rdd is wrap spark rdd, and this code will running in spark driver, will contain context

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my mistake, it's a RDD.

@danny0405 danny0405 added resource-and-concurrency performance usability priority:critical production down; pipelines stalled; Need help asap. labels Jun 19, 2024
@@ -144,7 +144,7 @@ protected HoodiePairData<HoodieKey, HoodieRecordLocation> fetchRecordLocationsFo
HoodieData<HoodieKey> hoodieKeys, HoodieEngineContext context, HoodieTable hoodieTable,
int parallelism) {
List<String> affectedPartitionPathList =
hoodieKeys.map(HoodieKey::getPartitionPath).distinct().collectAsList();
hoodieKeys.map(HoodieKey::getPartitionPath).distinct(hoodieKeys.deduceNumPartitions()).collectAsList();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we cannot use the parallelism value received as input in the distinct() i.e.
hoodieKeys.map(HoodieKey::getPartitionPath).distinct(parallelism).collectAsList();

Copy link
Contributor Author

@KnightChess KnightChess Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the case you had met, it will case parallelism too bigger if we use spark own default inference logic
image

@KnightChess
Copy link
Contributor Author

look like is a flaky test oom, other pr has the same question

@KnightChess KnightChess reopened this Jun 21, 2024
@yihua
Copy link
Contributor

yihua commented Jun 21, 2024

look like is a flaky test oom, other pr has the same question

I also noticed that Github CI frequently fails due to OOM now. I’m going to triage the offending commit on master.

@KnightChess
Copy link
Contributor Author

KnightChess commented Jun 21, 2024

@yihua hi, I've conducted some troubleshooting, but I've encountered some issues that I'm not very familiar with at the moment. I'll raise these questions in the hope that they may be helpful to you.
reproduce:

  • TestSparkDataSource.testCoreFlow vm: -Xmx1g -Xms128m
    vm dump analyzer:
  1. there has four task, every task will contain three HoodieLogFileReader, I found HoodieLogFormatReverseReader will hold pre reader, don't know the reason.
  2. so, I try to ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN true to not use it, but this param can not work. I update the raw code to make it work, but the ut in compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotRows2, colsToSelect) will be wrong, I don't know the reason.
  3. I suspect the problem is file group reader, so I set hoodie.file.group.reader.enabled false, anything will be ok
    image

Finally:
env: vm: -Xmx1g -Xms128m

ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN || hoodie.file.group.reader.enabled || UT result
true || false || success
true || true || ut_error
false || false || success
false || true || oom
image
hope can help you

@KnightChess
Copy link
Contributor Author

check_oom.patch

@yihua
Copy link
Contributor

yihua commented Jun 21, 2024

@yihua hi, I've conducted some troubleshooting, but I've encountered some issues that I'm not very familiar with at the moment. I'll raise these questions in the hope that they may be helpful to you. reproduce:

* TestSparkDataSource.testCoreFlow  vm: -Xmx1g -Xms128m
  vm dump analyzer:


1. there has four task, every task will contain three `HoodieLogFileReader`, I found `HoodieLogFormatReverseReader` will hold pre reader, don't know the reason.

2. so, I try to `ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN` true to not use it, but this param can not work. I update the raw code to make it work, but the ut in `compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotRows2, colsToSelect)` will be wrong, I don't know the reason.

3. I suspect the problem is file group reader, so I set `hoodie.file.group.reader.enabled` false, anything will be ok
   ![image](https://private-user-images.githubusercontent.com/20125927/341672553-a25d5193-4ee9-4915-929e-ef6710785392.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTg5OTQ5MzcsIm5iZiI6MTcxODk5NDYzNywicGF0aCI6Ii8yMDEyNTkyNy8zNDE2NzI1NTMtYTI1ZDUxOTMtNGVlOS00OTE1LTkyOWUtZWY2NzEwNzg1MzkyLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA2MjElMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNjIxVDE4MzAzN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWRjYTRiY2FlZDE4NGQzODBiMzRjMWI1ODcyMjcxMjFiNjA1OWE2NThkMmRkNGYwNDUyNGFmYjk2OTA5YzcyYmMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.cnwZR52UYTBtUCNRNxG2Hoh6vT-NrpTCqvf4zAyvtNI)

Finally: env: vm: -Xmx1g -Xms128m

ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN || hoodie.file.group.reader.enabled || UT result true || false || success true || true || ut_error false || false || success false || true || oom image hope can help you

Thanks for the details. These are definitely helpful. I'll check for any leaks in the new file group reader.

@yihua
Copy link
Contributor

yihua commented Jun 22, 2024

Hi @KnightChess somehow I could not reproduce OOM locally in my IntelliJ, but I figured out that the OOM is likely due to Spark's file index holding Hudi's table metadata instance, which caches the HFile readers for reading the metadata table. The new HFile reader, HoodieNativeAvroHFileReader, holds a reference to a shared underlying HFile reader that occupies memory. I'm working on a fix. To unblock CI, I've put up a PR to disable the new HFile reader by default: #11488.

Copy link
Contributor Author

@KnightChess KnightChess left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua I try set _hoodie.hfile.use.native.reader false in my local with Xmx 1g, the ut will success too. thanks for resolve, retry this pr ci.

@KnightChess KnightChess reopened this Jun 22, 2024
@KnightChess
Copy link
Contributor Author

KnightChess commented Jun 22, 2024

@yihua With reference to your thinking, I observed other objects. Indeed, this object also accounts for more memory. At first, I only paid attention to the most, they alse will hold 32% memory used, look like they all have different memory address.
image
image
image
image

@KnightChess
Copy link
Contributor Author

@yihua look like also has oom question, the ci running env is hard simulation
image

@KnightChess
Copy link
Contributor Author

oh, sorry, my problem, the pr you fix do not merge, ignore it

@yihua
Copy link
Contributor

yihua commented Jun 22, 2024

oh, sorry, my problem, the pr you fix do not merge, ignore it

No worries, it is just merged 20 minutes ago.

@yihua
Copy link
Contributor

yihua commented Jun 22, 2024

@yihua With reference to your thinking, I observed other objects. Indeed, this object also accounts for more memory. At first, I only paid attention to the most, they alse will hold 32% memory used, look like they all have different memory address. image image image image

Thanks for sending the heap dump. Yes, I also observed high memory usage of HoodieNativeAvroHFileReader on my side.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 merged commit 51c9c0e into apache:master Jun 22, 2024
46 checks passed
@xuzifu666
Copy link
Contributor

@KnightChess Sorry add a comment for feedback,I had a try,but seems not work as before,need some extract config change?

@KnightChess
Copy link
Contributor Author

@xuzifu666 do you have set spark.default.parallelism or spark.sql.shuffle.partitions ?

@xuzifu666
Copy link
Contributor

@xuzifu666 do you have set spark.default.parallelism or spark.sql.shuffle.partitions ?

Yes, spark.default.parallelism=2000 and spark.sql.shuffle.partitions = 2000

@KnightChess
Copy link
Contributor Author

@xuzifu666 can you show a full spark ui info, in my test, it work fine, or how to reproduct it.
image

@bibhu107
Copy link

Hi @KnightChess , This fix will be in which release?

@KnightChess
Copy link
Contributor Author

@bibhu107 0.16.0 and 1.0.0, but you can cherrypick or copy it in you verstion
image

@KnightChess KnightChess deleted the clear-parallelism branch July 2, 2024 14:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance priority:critical production down; pipelines stalled; Need help asap. resource-and-concurrency size:M PR with lines of changes in (100, 300] usability
Projects
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

None yet

6 participants