Skip to content
Permalink
Browse files
feature: support partition concurrent compute (#177)
  • Loading branch information
corgiboygsj committed May 19, 2022
1 parent 0d2e6df commit bcaef8563f15730b37b4d445cdf97a1f25c1e39f
Showing 14 changed files with 467 additions and 104 deletions.
@@ -21,45 +21,58 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.partition.PartitionStat;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.manager.Managers;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.receiver.MessageRecvManager;
import com.baidu.hugegraph.computer.core.receiver.MessageStat;
import com.baidu.hugegraph.computer.core.sender.MessageSendManager;
import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry;
import com.baidu.hugegraph.computer.core.worker.Computation;
import com.baidu.hugegraph.computer.core.worker.ComputationContext;
import com.baidu.hugegraph.computer.core.util.Consumers;
import com.baidu.hugegraph.computer.core.worker.WorkerContext;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.util.ExecutorUtil;
import com.baidu.hugegraph.util.Log;

public class ComputeManager<M extends Value> {
public class ComputeManager {

private static final Logger LOG = Log.logger(ComputeManager.class);
private static final String PREFIX = "partition-compute-executor-%s";

private final ComputerContext context;
private final Managers managers;

private final Map<Integer, FileGraphPartition<M>> partitions;
private final Computation<M> computation;
private final Map<Integer, FileGraphPartition> partitions;
private final MessageRecvManager recvManager;
private final MessageSendManager sendManager;
private final ExecutorService computeExecutor;

public ComputeManager(ComputerContext context, Managers managers,
Computation<M> computation) {
public ComputeManager(ComputerContext context, Managers managers) {
this.context = context;
this.managers = managers;
this.computation = computation;
this.partitions = new HashMap<>();
this.recvManager = this.managers.get(MessageRecvManager.NAME);
this.sendManager = this.managers.get(MessageSendManager.NAME);

int computeThreadNum = this.partitionComputeThreadNum(context.config());
this.computeExecutor = ExecutorUtil.newFixedThreadPool(
computeThreadNum, PREFIX);
LOG.info("Created partition compute thread poll, thread num: {}",
computeThreadNum);
}

private Integer partitionComputeThreadNum(Config config) {
return config.get(ComputerOptions.PARTITIONS_COMPUTE_THREAD_NUMS);
}

public WorkerStat input() {
@@ -70,6 +83,7 @@ public WorkerStat input() {
this.recvManager.vertexPartitions();
Map<Integer, PeekableIterator<KvEntry>> edges =
this.recvManager.edgePartitions();

// TODO: parallel input process
for (Map.Entry<Integer, PeekableIterator<KvEntry>> entry :
vertices.entrySet()) {
@@ -80,9 +94,9 @@ public WorkerStat input() {
partition,
PeekableIterator.emptyIterator());

FileGraphPartition<M> part = new FileGraphPartition<>(this.context,
this.managers,
partition);
FileGraphPartition part = new FileGraphPartition(this.context,
this.managers,
partition);
PartitionStat partitionStat = null;
ComputerException inputException = null;
try {
@@ -123,30 +137,45 @@ public WorkerStat input() {
public void takeRecvedMessages() {
Map<Integer, PeekableIterator<KvEntry>> messages =
this.recvManager.messagePartitions();
for (FileGraphPartition<M> partition : this.partitions.values()) {
for (FileGraphPartition partition : this.partitions.values()) {
partition.messages(messages.get(partition.partition()));
}
}

public WorkerStat compute(ComputationContext context, int superstep) {
public WorkerStat compute(WorkerContext context, int superstep) {
this.sendManager.startSend(MessageType.MSG);

WorkerStat workerStat = new WorkerStat();
Map<Integer, PartitionStat> partitionStats = new HashMap<>(
this.partitions.size());
// TODO: parallel compute process.
for (FileGraphPartition<M> partition : this.partitions.values()) {
PartitionStat stat = partition.compute(context,
this.computation,
superstep);
partitionStats.put(stat.partitionId(), stat);
Map<Integer, PartitionStat> stats = new ConcurrentHashMap<>();

/*
* Remark: The main thread can perceive the partition compute exception
* only after all partition compute completed, and only record the last
* exception.
*/
Consumers<FileGraphPartition> consumers =
new Consumers<>(this.computeExecutor, partition -> {
PartitionStat stat = partition.compute(context,
superstep);
stats.put(stat.partitionId(), stat);
});
consumers.start("partition-compute");

try {
for (FileGraphPartition partition : this.partitions.values()) {
consumers.provide(partition);
}
consumers.await();
} catch (Throwable t) {
throw new ComputerException("An exception occurred when " +
"partition parallel compute", t);
}

this.sendManager.finishSend(MessageType.MSG);

// After compute and send finish signal.
Map<Integer, MessageStat> recvStats = this.recvManager.messageStats();
for (Map.Entry<Integer, PartitionStat> entry :
partitionStats.entrySet()) {
for (Map.Entry<Integer, PartitionStat> entry : stats.entrySet()) {
PartitionStat partStat = entry.getValue();
int partitionId = partStat.partitionId();

@@ -165,10 +194,14 @@ public WorkerStat compute(ComputationContext context, int superstep) {

public void output() {
// TODO: Write results back parallel
for (FileGraphPartition<M> partition : this.partitions.values()) {
for (FileGraphPartition partition : this.partitions.values()) {
PartitionStat stat = partition.output();
LOG.info("Output partition {} complete, stat='{}'",
partition.partition(), stat);
}
}

public void close() {
this.computeExecutor.shutdown();
}
}
@@ -23,6 +23,8 @@
import java.io.IOException;
import java.util.Iterator;

import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
@@ -47,16 +49,21 @@
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.Pointer;
import com.baidu.hugegraph.computer.core.worker.Computation;
import com.baidu.hugegraph.computer.core.worker.ComputationContext;
import com.baidu.hugegraph.computer.core.worker.WorkerContext;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;

public class FileGraphPartition {

public class FileGraphPartition<M extends Value> {
private static final Logger LOG = Log.logger(FileGraphPartition.class);

private static final String VERTEX = "vertex";
private static final String EDGE = "edge";
private static final String STATUS = "status";
private static final String VALUE = "value";

private final ComputerContext context;
private final Computation<Value> computation;
private final FileGenerator fileGenerator;
private final int partition;

@@ -79,12 +86,16 @@

private VertexInput vertexInput;
private EdgesInput edgesInput;
private MessageInput<M> messageInput;
private MessageInput<Value> messageInput;

public FileGraphPartition(ComputerContext context,
Managers managers,
int partition) {
this.context = context;
this.computation = context.config()
.createObject(
ComputerOptions.WORKER_COMPUTATION_CLASS);
this.computation.init(context.config());
this.fileGenerator = managers.get(FileManager.NAME);
this.partition = partition;
this.vertexFile = new File(this.fileGenerator.randomDirectory(VERTEX));
@@ -121,9 +132,10 @@ protected PartitionStat input(PeekableIterator<KvEntry> vertices,
this.edgeCount, 0L);
}

protected PartitionStat compute(ComputationContext context,
Computation<M> computation,
protected PartitionStat compute(WorkerContext context,
int superstep) {
LOG.info("Partition {} begin compute in superstep {}",
this.partition, superstep);
try {
this.beforeCompute(superstep);
} catch (IOException e) {
@@ -134,9 +146,11 @@ protected PartitionStat compute(ComputationContext context,

long activeVertexCount;
try {
this.computation.beforeSuperstep(context);
activeVertexCount = superstep == 0 ?
this.compute0(context, computation) :
this.compute1(context, computation, superstep);
this.compute0(context) :
this.compute1(context);
this.computation.afterSuperstep(context);
} catch (Exception e) {
throw new ComputerException(
"Error occurred when compute at superstep %s",
@@ -151,14 +165,16 @@ protected PartitionStat compute(ComputationContext context,
e, superstep);
}

LOG.info("Partition {} finish compute in superstep {}",
this.partition, superstep);

return new PartitionStat(this.partition, this.vertexCount,
this.edgeCount,
this.vertexCount - activeVertexCount);
}


private long compute0(ComputationContext context,
Computation<M> computation) {
private long compute0(ComputationContext context) {
long activeVertexCount = 0L;
while (this.vertexInput.hasNext()) {
Vertex vertex = this.vertexInput.next();
@@ -167,7 +183,7 @@ private long compute0(ComputationContext context,
Edges edges = this.edgesInput.edges(this.vertexInput.idPointer());
vertex.edges(edges);

computation.compute0(context, vertex);
this.computation.compute0(context, vertex);

if (vertex.active()) {
activeVertexCount++;
@@ -183,18 +199,16 @@ private long compute0(ComputationContext context,
return activeVertexCount;
}

private long compute1(ComputationContext context,
Computation<M> computation,
int superstep) {
private long compute1(ComputationContext context) {
Value result = this.context.config().createObject(
ComputerOptions.ALGORITHM_RESULT_CLASS);
long activeVertexCount = 0L;
while (this.vertexInput.hasNext()) {
Vertex vertex = this.vertexInput.next();
this.readVertexStatusAndValue(vertex, result);

Iterator<M> messageIter = this.messageInput.iterator(
this.vertexInput.idPointer());
Iterator<Value> messageIter = this.messageInput.iterator(
this.vertexInput.idPointer());
if (messageIter.hasNext()) {
vertex.reactivate();
}
@@ -207,7 +221,7 @@ private long compute1(ComputationContext context,
Edges edges = this.edgesInput.edges(
this.vertexInput.idPointer());
vertex.edges(edges);
computation.compute(context, vertex, messageIter);
this.computation.compute(context, vertex, messageIter);
}

// The vertex status may be changed after computation
@@ -433,6 +433,14 @@ public static synchronized ComputerOptions instance() {
1
);

public static final ConfigOption<Integer> PARTITIONS_COMPUTE_THREAD_NUMS =
new ConfigOption<>(
"job.partitions_thread_nums",
"The number of threads for partition parallel compute.",
positiveInt(),
4
);

public static final ConfigOption<Integer> BSP_MAX_SUPER_STEP =
new ConfigOption<>(
"bsp.max_super_step",
@@ -92,5 +92,6 @@ public void loadGraph() {
this.sendManager.sendEdge(vertex);
}
this.sendManager.finishSend(MessageType.EDGE);
this.sendManager.clearBuffer();
}
}
@@ -87,7 +87,7 @@ public void connect(int workerId, String hostname, int dataPort) {
this.sender.addWorkerClient(workerId, client);
} catch (TransportException e) {
throw new ComputerException(
"Failed to connect to worker: {}({}:{})",
"Failed to connect to worker: %s(%s:%s)",
workerId, hostname, dataPort);
}
}
@@ -34,7 +34,7 @@ public class MessageSendBuffers {
* Add a MessageSendPartition class when find that we really need it
* to encapsulate more objects.
*/
private final WriteBuffers[] buffers;
private final MessageSendPartition[] buffers;

public MessageSendBuffers(ComputerContext context) {
Config config = context.config();
@@ -43,28 +43,35 @@ public MessageSendBuffers(ComputerContext context) {
ComputerOptions.WORKER_WRITE_BUFFER_THRESHOLD);
int capacity = config.get(
ComputerOptions.WORKER_WRITE_BUFFER_INIT_CAPACITY);
this.buffers = new WriteBuffers[partitionCount];
this.buffers = new MessageSendPartition[partitionCount];
for (int i = 0; i < partitionCount; i++) {
/*
* It depends on the concrete implementation of the
* partition algorithm, which is not elegant.
*/
this.buffers[i] = new WriteBuffers(context, threshold, capacity);
this.buffers[i] = new MessageSendPartition(context, threshold,
capacity);
}
}

public WriteBuffers get(int partitionId) {
if (partitionId < 0 || partitionId >= this.buffers.length) {
throw new ComputerException("Invalid partition id %s", partitionId);
}
return this.buffers[partitionId];
return this.buffers[partitionId].buffersForCurrentThread();
}

public Map<Integer, WriteBuffers> all() {
Map<Integer, WriteBuffers> all = InsertionOrderUtil.newMap();
public Map<Integer, MessageSendPartition> all() {
Map<Integer, MessageSendPartition> all = InsertionOrderUtil.newMap();
for (int i = 0; i < this.buffers.length; i++) {
all.put(i, this.buffers[i]);
}
return all;
}

public void clear() {
for (MessageSendPartition partition : this.buffers) {
partition.clear();
}
}
}

0 comments on commit bcaef85

Please sign in to comment.