forked from apache/incubator-hugegraph
-
Notifications
You must be signed in to change notification settings - Fork 1
OLTP multi‐thread Design Doc
Wu Chencan edited this page Dec 5, 2023
·
1 revision
在算法实现过程中,支持批量和多线程并发的执行方式
- 以Kout算法的BFS模式为例,对可进行并行执行的部分进行分析,然后给出具体的方案
- 以上为测试用图,我们需要从
marko
节点出发,寻找满足条件depth=2&nearest=true
的节点
- 阶段0,从
marko
节点出发,得到邻接节点josh
,lop
和vadas
; - 阶段1,从节点
josh
,lop
,vadas
出发互不干扰,所以可以同时从这几个节点出发,采用多线程并行的方式,得到邻接节点ripple
和peter
;
- 阶段2中的
lop
和josh
因为nearest
参数限制被过滤
- 阶段2,
depth
达到阈值,得到深度距离为2的节点ripple
和peter
- 对需要迭代遍历的
Iterator<Id> vertices
,我们需要一种数据结构,能够迭代获取每个vertex
对应的邻边edges
,即一种嵌套的迭代器Iterator<Iterator<Edge>>
- 第一层
Iterator
是对目标节点vertices
的迭代 - 第二层
Iterator
是对具体节点vertex
的邻接边edges
的迭代
- 第一层
// HugeTraverser.java
public EdgesIterator edgesOfVertices(Iterator<Id> sources,
Directions dir,
List<Id> labelIds,
long degree) {
return new EdgesIterator(new EdgesQueryIterator(sources, dir, labelIds, degree));
}
public class EdgesIterator implements Iterator<Iterator<Edge>>, Closeable {
private final Iterator<Iterator<Edge>> currentIt;
// highlight
public EdgesIterator(EdgesQueryIterator queryIterator) {
List<Iterator<Edge>> iteratorList = new ArrayList<>();
while (queryIterator.hasNext()) {
iteratorList.add(graph().edges(queryIterator.next()));
}
this.currentIt = iteratorList.iterator();
}
@Override
public boolean hasNext() {
return this.currentIt.hasNext();
}
@Override
public Iterator<Edge> next() {
return this.currentIt.next();
}
@Override
public void close() throws IOException {
CloseableIterator.closeIterator(currentIt);
}
}
-
EdgesQueryIterator
:能够根据vertices
生成queryIterator
,可以迭代获得vertex
对应的边查询语句query
-
EdgesIterator
:根据可迭代的query
,查询到每个vertex
对应的edges
的迭代器- 代码中
highlight
注释的部分,在内部版server
的实现中,后端存储中提供直接接口,可以通过EdgesQueryIterator
直接查询并返回一个嵌套的边迭代器Iterator<Iterator<Edge>>
- 在本次实现中,采用了一种简化的方法,通过迭代获得
query
进行查询,将查询结果存储在list
中,然后返回list
的迭代器,从而实现嵌套的迭代器
- 代码中
// OltpTraverser.java
protected <K> long traverseBatch(Iterator<Iterator<K>> iterator,
Consumer<Iterator<K>> consumer,
String name, int queueWorkerSize) {
if (!iterator.hasNext()) {
return 0L;
}
AtomicBoolean done = new AtomicBoolean(false);
Consumers<Iterator<K>> consumers = null;
try {
consumers = getConsumers(consumer, queueWorkerSize, done,
executors.getExecutor());
return consumersStart(iterator, name, done, consumers);
} finally {
assert consumers != null;
executors.returnExecutor(consumers.executor());
}
}
private <K> long consumersStart(Iterator<Iterator<K>> iterator, String name,
AtomicBoolean done,
Consumers<Iterator<K>> consumers) {
long total = 0L;
try {
consumers.start(name);
while (iterator.hasNext() && !done.get()) {
total++;
Iterator<K> v = iterator.next();
consumers.provide(v);
}
}
...
return total;
}
- 在
traverseBatch
方法中,进行批量执行和多线程并发执行- 参数
iterator
:是嵌套的Iterator<Iterator<K>>
,存储vertices
对应的邻接边edges
- 参数
consumer
:是一个消费者函数接口,接受Iterator<K>
作为参数,执行预设的方法
- 参数
- 在
consumersStart
方法中,将任务提交给consumers
,在consumers
中通过多线程的方式进行任务消费
- 目前在
Kout
和Kneighbor
中完成了并行执行的实现,主要通过KoutRecords
和KneighborRecords
自带的并发控制完成控制
- Request URL:
POST http://localhost:8080/graphs/hugegraph/traversers/kout
- Request Body:
{
"source": "1:marko",
"steps": {
"direction": "BOTH",
"edge_steps": [
{
"label": "knows",
"properties": {
}
},
{
"label": "created",
"properties": {
}
}
],
"vertex_steps": [
{
"label": "person",
"properties": {
}
},
{
"label": "software",
"properties": {}
}
],
"max_degree": 10000,
"skip_degree": 100000
},
"max_depth": 2,
"nearest": false,
"limit": 10000,
"with_vertex": false,
"with_path": true,
"with_edge": false
}
- 串行Kout Post
{
"kout": [
"1:peter",
"1:josh",
"2:ripple",
"2:lop"
],
"size": 4,
"paths": [
{
"objects": [
"1:marko",
"2:lop",
"1:josh"
]
},
{
"objects": [
"1:marko",
"2:lop",
"1:peter"
]
},
{
"objects": [
"1:marko",
"1:josh",
"2:ripple"
]
},
{
"objects": [
"1:marko",
"1:josh",
"2:lop"
]
}
],
...,
"measure": {
"edge_iterations": 10,
"vertice_iterations": 4,
"cost(ns)": 329856375
}
}
- 并行Kout Post
{
"kout": [
"1:peter",
"1:josh",
"2:ripple",
"2:lop"
],
"size": 4,
"paths": [
{
"objects": [
"1:marko",
"2:lop",
"1:josh"
]
},
{
"objects": [
"1:marko",
"2:lop",
"1:peter"
]
},
{
"objects": [
"1:marko",
"1:josh",
"2:ripple"
]
},
{
"objects": [
"1:marko",
"1:josh",
"2:lop"
]
}
],
...,
"measure": {
"edge_iterations": 10,
"vertice_iterations": 4,
"cost(ns)": 191413166
}
}
- 在多组数据的测试中,并未发现正确性方面存在问题
- 目前数据量较小,无法测试是否有显著加速效果