Skip to content

[python legacy] use process pool instead of thread pool for files planning#4745

Merged
rdblue merged 1 commit intoapache:masterfrom
puchengy:legacy-py-process-pool
May 18, 2022
Merged

[python legacy] use process pool instead of thread pool for files planning#4745
rdblue merged 1 commit intoapache:masterfrom
puchengy:legacy-py-process-pool

Conversation

@puchengy
Copy link
Contributor

@puchengy puchengy commented May 11, 2022

Based on analysis, plan_files() spent most of the time on doing copy and file reading which are both CPU bounded operations. However, current implementation is using thread pool instead of process pool which does not speed up the operation at all.

This diff proposes switch to process pool instead of thread pool.

Frame graph
profile

Command
py-spy record -o profile.svg -- python myprogram.py

Code

from iceberg.hive import HiveTables

conf = {"hive.metastore.uris": 'xxxx', 'iceberg.scan.plan-in-worker-pool': True, 'iceberg.worker.num-threads': 4}
tables = HiveTables(conf)
table = tables.load('abc.xyz')
scan = table.new_scan()
files = scan.plan_files()

@samredai
Copy link
Contributor

samredai commented May 11, 2022

Looking at the function passed to the pool (get_scans_for_manifest) I can understand how this improves the parsing and evaluator logic. For the avro file reads, I'm not sure but I'd expect this would scale better with multithreading since that's I/O bound. @Fokko is it possible that threading may be better here beyond a certain number of files?

@rdblue rdblue closed this May 11, 2022
@rdblue rdblue reopened this May 11, 2022
@rdblue
Copy link
Contributor

rdblue commented May 11, 2022

Sorry, I didn't mean to close this. That was an accident.

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. If we run into memory issues, we could also replace the current map function:

if self.ops.conf.get(SCAN_THREAD_POOL_ENABLED):
with Pool(self.ops.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
cpu_count())) as reader_scan_pool:
return itertools.chain.from_iterable([scan for scan
in reader_scan_pool.map(self.get_scans_for_manifest,
matching_manifests)])
else:
return itertools.chain.from_iterable([self.get_scans_for_manifest(manifest)
for manifest in matching_manifests])

Replace the map with the it's lazy imap brother:
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap

import itertools
import logging
from multiprocessing import cpu_count
from multiprocessing.dummy import Pool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; we could combine the imports:

from multiprocessing import cpu_count, Pool

@Fokko
Copy link
Contributor

Fokko commented May 12, 2022

@samredai

Looking at the function passed to the pool (get_scans_for_manifest) I can understand how this improves the parsing and evaluator logic. For the avro file reads, I'm not sure but I'd expect this would scale better with multithreading since that's I/O bound. @Fokko is it possible that threading may be better here beyond a certain number of files?

I think it would make sense to parallelize that as well to reduce the runtime of the function. Especially if we need to fetch files from external storage that introduces some latency, I would parallelize that kind of calls as well to improve overall throughput.

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think this looks good as well.

I don't know if scrutinizing it super closely for corner cases is necessary given that we've seen the performance gains and it is the legacy python project.

I've put out some calls for testing in the community for people who I know do actively use the python_legacy project a lot. If there's any negative feedback, we can always revert if need be. 👍

Good find @puchengy!

@chyzzqo2
Copy link
Contributor

In general in python its hard to deal with libraries that make this choice for you. Parallelism via threads and processes don't mix so the choice is better left to the application rather than the library. Can we make the choice configurable?

@samredai
Copy link
Contributor

@chyzzqo2 this is a great point. We already have the iceberg.scan.plan-in-worker-pool config property that enables this code path. Maybe we can add a config along the lines of iceberg.scan.plan-in-worker-pool.type with values of "threads" or "processes". Also I just noticed that "threads" is in the config property name for pool sizes (here) which can be confusing if processes are actually being used.

@rdblue
Copy link
Contributor

rdblue commented May 18, 2022

I think we want to follow up and implement @samredai's suggestion to add a pool type that controls this. We can do that in a follow-up, though. In the meantime, I'll merge this.

@rdblue rdblue merged commit 3586e14 into apache:master May 18, 2022
@rdblue
Copy link
Contributor

rdblue commented May 18, 2022

Thanks, @puchengy!

@puchengy puchengy deleted the legacy-py-process-pool branch May 18, 2022 17:18
@puchengy
Copy link
Contributor Author

@rdblue Thanks I will have a follow up PR for that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants