Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve spark parallel #450

Merged
merged 1 commit into from Apr 7, 2023
Merged

chore: improve spark parallel #450

merged 1 commit into from Apr 7, 2023

Conversation

zyxxoo
Copy link
Contributor

@zyxxoo zyxxoo commented Apr 6, 2023

No description provided.

LOG.info("\n Start to load data using spark bulkload \n");
// gen-hfile
HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions, struct,
loadDistributeMetrics);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

很奇怪这里的 loadDistributeMetrics, 这个代码是跑在算子里面的,我理解应该是算子获取的是这个方法的备份吧?spark 怎么把这个传到 drive 里面来呢?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imbajin imbajin requested review from simon824 and imbajin April 6, 2023 10:48
@codecov
Copy link

codecov bot commented Apr 6, 2023

Codecov Report

Merging #450 (42927ce) into master (36a1ada) will decrease coverage by 0.05%.
The diff coverage is 0.00%.

❗ Current head 42927ce differs from pull request most recent head dad4504. Consider uploading reports for the commit dad4504 to get more accurate results

@@             Coverage Diff              @@
##             master     #450      +/-   ##
============================================
- Coverage     62.57%   62.52%   -0.05%     
+ Complexity     1867      894     -973     
============================================
  Files           260       91     -169     
  Lines          9418     4395    -5023     
  Branches        872      516     -356     
============================================
- Hits           5893     2748    -3145     
+ Misses         3143     1444    -1699     
+ Partials        382      203     -179     
Impacted Files Coverage Δ
...e/hugegraph/loader/spark/HugeGraphSparkLoader.java 0.00% <0.00%> (ø)

... and 169 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

LoadContext context = initPartition(this.loadOptions, struct);
p.forEachRemaining((Row row) -> {
loadRow(struct, row, p, context);
Future<?> future = Executors.newCachedThreadPool().submit(() -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

按我个人理解,这里并发应该没有线程安全问题了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里用 cache threadpool,按我个人理解应该是加载多个文件,所以并行执行,生成多个 DAG,然后由 spark 去做调度具体任务,所以我这里没有考虑线程池大小

@simon824 simon824 merged commit cf1312e into master Apr 7, 2023
9 checks passed
@simon824 simon824 deleted the zy_dev branch April 7, 2023 01:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants