feat: implement parallel send data in load graph step#248
feat: implement parallel send data in load graph step#248imbajin merged 8 commits intoapache:masterfrom
Conversation
| Vertex vertex = iterator.next(); | ||
| this.sendManager.sendEdge(vertex); | ||
| futures.add(CompletableFuture.runAsync(() -> consumer.accept(vertex), | ||
| this.sendExecutor)); |
There was a problem hiding this comment.
prefer to align with CompletableFuture
| sendThreadNum); | ||
| } | ||
|
|
||
| private Integer inputLoaderThreadNum(Config config) { |
There was a problem hiding this comment.
inputLoaderThreadNum => inputSendThreadNum ?
|
Hi, @javeme , I find that create a future for each vertex(edge) will cause OOM, I will change the implementation to parallel handle |
Codecov Report
@@ Coverage Diff @@
## master #248 +/- ##
============================================
+ Coverage 85.78% 85.82% +0.04%
- Complexity 3230 3237 +7
============================================
Files 344 344
Lines 12076 12105 +29
Branches 1088 1090 +2
============================================
+ Hits 10359 10389 +30
+ Misses 1190 1188 -2
- Partials 527 528 +1
... and 6 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
|
|
||
| this.loadServices = new ArrayList<>(this.sendThreadNum); | ||
| for (int i = 0; i < this.sendThreadNum; i++) { | ||
| this.loadServices.add(new LoadService(context)); |
There was a problem hiding this comment.
seems don't need to create multi LoadService, just try calling LoadService.createIteratorFromVertex() with sendThreadNum times.
There was a problem hiding this comment.
Hi, @javeme , thanks for your comment, I move fetcher into IteratorFromVertex and IteratorFromEdge, also replace creation of multi LoadService as you suggested.
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
Outdated
Show resolved
Hide resolved
| */ | ||
| private GraphFetcher fetcher; | ||
|
|
||
| public IteratorFromVertex(Config config, InputSplitRpcService rpcService) { |
There was a problem hiding this comment.
can we just pass into the GraphFetcher fetcher arg
There was a problem hiding this comment.
It seems that we can't, fetcher has property localBatch, different threads can not share the same localBatch.
There was a problem hiding this comment.
we can create GraphFetcher when new IteratorFromVertex()
There was a problem hiding this comment.
Do you mean like this?
public Iterator<Vertex> createIteratorFromVertex() {
GraphFetcher fetcher = InputSourceFactory.createGraphFetcher(this.config, this.rpcService);
return new IteratorFromVertex(fetcher);
}
computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
Outdated
Show resolved
Hide resolved
ab11331 to
5f74db7
Compare
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Outdated
Show resolved
Hide resolved
|
ci error: Error: Tests run: 563, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 218.399 s <<< FAILURE! - in org.apache.hugegraph.computer.suite.unit.UnitTestSuite
Error: testRunAlgorithmFromHdfs(org.apache.hugegraph.computer.algorithm.centrality.degree.DegreeCentralityTest) Time elapsed: 4.379 s <<< FAILURE!
java.lang.AssertionError: [java.util.concurrent.CompletionException: org.apache.hugegraph.computer.core.common.exception.ComputerException: An exception occurred during parallel sending edges, null]
at org.apache.hugegraph.computer.algorithm.centrality.degree.DegreeCentralityTest.testRunAlgorithmFromHdfs(DegreeCentralityTest.java:50)
[INFO]
[INFO] Results:
[INFO]
Error: Failures:
Error: DegreeCentralityTest.testRunAlgorithmFromHdfs:50->AlgorithmTestBase.runAlgorithm:147 [java.util.concurrent.CompletionException: org.apache.hugegraph.computer.core.common.exception.ComputerException:
An exception occurred during parallel sending edges, null]
[INFO]
Error: Tests run: 563, Failures: 1, Errors: 0, Skipped: 0 |
Hi, @javeme , I've checked recent CI logs, the error is caused by |
|
Sry, it's my own fault, I'm doing some code profiling now ... |
|
Hi, @javeme , I've careful debug, and find the real reason is that Thus, I use a fetcher array to manage all fetchers in |
javeme
left a comment
There was a problem hiding this comment.
Great, it's clear now.
Thanks for your analysis and feedback of the root cause.
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Outdated
Show resolved
Hide resolved
| public void init() { | ||
| assert this.rpcService != null; | ||
| for (int i = 0; i < this.fetcherNum; i++) { | ||
| this.fetchers[i] = InputSourceFactory.createGraphFetcher(this.config, this.rpcService); |
There was a problem hiding this comment.
could you please also add some comment for the reason why we hold a fetchers array
Purpose of the PR
Main Changes
Verifying these changes
Does this PR potentially affect the following parts?
Documentation Status
Doc - TODODoc - DoneDoc - NO Need