-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: support partition concurrent compute #177
Conversation
9c5719d
to
a16e64b
Compare
Codecov Report
@@ Coverage Diff @@
## master #177 +/- ##
============================================
- Coverage 87.32% 86.75% -0.57%
- Complexity 3151 3155 +4
============================================
Files 332 334 +2
Lines 11806 11897 +91
Branches 1053 1053
============================================
+ Hits 10309 10321 +12
- Misses 992 1069 +77
- Partials 505 507 +2
Continue to review full report at Codecov.
|
2250d87
to
db82aa5
Compare
int partitionId = this.partitioner.partitionId(vertex.id()); | ||
WriteBuffers buffer = this.buffers.get(partitionId); | ||
|
||
synchronized (buffer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use a more fine-grained lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any good ideas. Both writingBuffer
and sortingBuffer
may operated by this method, so I think WriteBuffers
is the minimum granularity of lock.
computer-core/src/main/java/com/baidu/hugegraph/computer/core/sender/MessageSendManager.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/com/baidu/hugegraph/computer/core/sender/MessageSendManager.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/com/baidu/hugegraph/computer/core/sort/sorter/JavaInputSorter.java
Show resolved
Hide resolved
|
||
private static List<KvEntry> threadSortList() { | ||
List<KvEntry> list = SORT_LOCAL.get(); | ||
list.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may cause all KvEntry released
@javeme |
@corgiboygsj Do you mean let receive-tasks and computing-tasks share the same thread pool? |
|
||
public final class Consumers<V> { | ||
|
||
private static final int CPU_CORE_NUM = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a const instead to avoid container trap: aa80c09#diff-20996cff4e37b7ee95339fe118177993577be083e6034208aa0bb49ff29dd0ecR40
db82aa5
to
1085cee
Compare
5f4397f
to
8cc49b6
Compare
import com.baidu.hugegraph.computer.core.common.ComputerContext; | ||
import com.baidu.hugegraph.computer.core.receiver.MessageStat; | ||
|
||
public class MessageSendPartition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer rename to ThreadWriteBuffers
this.buffers = new ConcurrentHashMap<>(); | ||
} | ||
|
||
public WriteBuffers get() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to buffersForCurrentThread()?
public void resetMessageWritten() { | ||
for (WriteBuffers buffer : this.buffers.values()) { | ||
buffer.resetMessageWritten(); | ||
} | ||
} | ||
|
||
public MessageStat messageWritten() { | ||
MessageStat partitionStat = new MessageStat(); | ||
for (WriteBuffers buffer : this.buffers.values()) { | ||
partitionStat.increase(buffer.messageWritten()); | ||
} | ||
return partitionStat; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may access by multi threads?
Due to the lack of activity, the current pr is marked as stale and will be closed after 180 days, any update will remove the stale label |
computer-core/src/main/java/com/baidu/hugegraph/computer/core/sort/sorter/JavaInputSorter.java
Show resolved
Hide resolved
computer-core/src/main/java/com/baidu/hugegraph/computer/core/sort/sorter/JavaInputSorter.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/com/baidu/hugegraph/computer/core/compute/ComputeManager.java
Outdated
Show resolved
Hide resolved
|
||
private final ComputerContext context; | ||
private final Managers managers; | ||
|
||
private final Map<Integer, FileGraphPartition<M>> partitions; | ||
private final Computation<M> computation; | ||
private final Map<Integer, FileGraphPartition> partitions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it right: multi threads may handle one partition, one MessageSendPartition hold multi buffers, one buffer each thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
if (!buffer.isEmpty()) { | ||
buffer.prepareSorting(); | ||
futures.add(this.sortThenSend(partitionId, type, buffer)); | ||
for (WriteBuffers buffer : partition.buffers()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we ensure no thread access partition.buffers any more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method will only be called by main thread
CLA Assistant Lite bot: I have read the CLA Document and I hereby sign the CLA You can retrigger this bot by commenting recheck in this Pull Request |
212bd8f
to
89e92bc
Compare
computer-core/src/main/java/com/baidu/hugegraph/computer/core/sort/sorter/JavaInputSorter.java
Show resolved
Hide resolved
computer-test/src/main/java/com/baidu/hugegraph/computer/core/compute/input/EdgesInputTest.java
Show resolved
Hide resolved
computer-core/src/main/java/com/baidu/hugegraph/computer/core/sender/MessageSendPartition.java
Outdated
Show resolved
Hide resolved
public void clear() { | ||
this.buffers.clear(); | ||
} | ||
|
||
public void resetMessageWritten() { | ||
for (WriteBuffers buffer : this.buffers.values()) { | ||
buffer.resetMessageWritten(); | ||
} | ||
} | ||
|
||
public MessageStat messageWritten() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add synchronized for the 3 methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can do this, but they are only accessed by the main thread
computer-core/src/main/java/com/baidu/hugegraph/computer/core/sort/sorter/JavaInputSorter.java
Outdated
Show resolved
Hide resolved
2956516
to
09b0e51
Compare
ci error: |
09b0e51
to
bca36db
Compare
Codecov Report
@@ Coverage Diff @@
## master #177 +/- ##
============================================
- Coverage 87.37% 86.73% -0.64%
- Complexity 3153 3167 +14
============================================
Files 332 334 +2
Lines 11806 11962 +156
Branches 1053 1068 +15
============================================
+ Hits 10315 10375 +60
- Misses 988 1073 +85
- Partials 503 514 +11
Continue to review full report at Codecov.
|
implement #167