-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HIVE-28276: Iceberg:Make Iceberg split threads configurable when table scanning #5260
Conversation
final ExecutorService workerPool = | ||
ThreadPools.newWorkerPool("iceberg-plan-worker-pool", | ||
conf.getInt(InputFormatConfig.TABLE_PLAN_WORKER_POOL_SIZE, ThreadPools.WORKER_THREAD_POOL_SIZE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work
final ExecutorService workerPool =
ThreadPools.newWorkerPool("iceberg-plan-worker-pool", SystemConfigs.WORKER_THREAD_POOL_SIZE.value());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can not set the thread pool size in seesion connection.
What i want to do is that we can set the scan thread pool size in seesion, e.g. we can set iceberg.worker.num-threads=8
in beeline console.
And if users don't set the scan thread pool size, the size will follow the current default logic to initialize a thread size which is equal system cores.
if (fromVersion != -1) { | ||
scan = applyConfig(conf, createIncrementalAppendScan(table, conf)); | ||
scan = applyConfig(conf, createIncrementalAppendScan(table, conf)).planWith(workerPool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to shutdown this workerPool
in the finally
block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no need. I think iceberg side has done the others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just double check once, that iceberg takes care of that & there is no leak here before committing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Let me double check again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked iceberg code, and didn't find any code snippet about workPool
shutdown. So i think you are right, shutdown this workerPool in the finally block is a more better way. Like flink iceberg side:
https://github.com/apache/iceberg/blob/cbb853073e681b4075d7c8707610dceecbee3a82/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java#L92-L101
Change in this commit ef6d684
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thnx, looks good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe if the value configured is 1, we might not need to initialize this executor service and go with normal flow
@ayushtkn I think I got mixed up. :( The scan is in Tez AM side not HS2 side. I will double check this PR later. Make it WIP... |
The iceberg api side will always initialize the executor service, so we can not do other things.
This PR is aimed for Tez AM, as the scan logic is on Tez AM side. This PR can let hive users to configure the Tez AM scan thread size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
Outdated
Show resolved
Hide resolved
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for casting
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, pending tests
|
What changes were proposed in this pull request?
I have noticed that if a iceberg table has lots of metadata/data files, iceberg will use many system cores to do scan planning, which may put some pressure on Tez AM memory/cpu.
We can try to make the thread pool size configurable, to avoid concurrency pressure on Tez AM.
Other OSS also did similar optimization, like FIink:
https://github.com/apache/iceberg/blob/cbb853073e681b4075d7c8707610dceecbee3a82/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java#L92-L100
Why are the changes needed?
This can configure the icbeberg planning thread size, to reduce Tez AM memory/cpu pressure as well as reduce some pressure on operating system.
If we don't limit the thread pool size, it wii use a default value which is equal system cores when doing
scan.planTasks()
:,https://github.com/apache/iceberg/blob/9a5d24fee239352021a9a73f6a4cad8ecf464f01/core/src/main/java/org/apache/iceberg/SystemConfigs.java#L38-L43
Does this PR introduce any user-facing change?
No
Is the change a dependency upgrade?
No
How was this patch tested?
Existing test.